diff --git a/DEPS.bzl b/DEPS.bzl index 91254eb81f34f..c600fa4b560db 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -7766,13 +7766,13 @@ def go_deps(): build_tags = ["nextgen", "intest"], build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sha256 = "daff9dec412c452d98f2adf2e2c4ac5132e93acd0f773612c69681cd6f76b4d4", - strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260326064539-728676760deb", + sha256 = "06b88aebfa4f416d2fd77edb70d51f8c590f61f71be6cbfb328e525e9907d027", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260330060945-282ada62b856", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip", - "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index f95af192719fc..186f2f6fd2d56 100644 --- a/go.mod +++ b/go.mod @@ -124,7 +124,7 @@ require ( github.com/stathat/consistent v1.0.0 github.com/stretchr/testify v1.11.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb + github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856 github.com/tikv/pd/client v0.0.0-20260323032024-d7b638033a14 github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 github.com/twmb/murmur3 v1.1.6 diff --git a/go.sum b/go.sum index 21f1dfbda3d8b..a015ee353ed31 100644 --- a/go.sum +++ b/go.sum @@ -895,8 +895,8 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= -github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb h1:ZVxuRRtpKp76G4zNePQ9vLV2UwEf8nJjr6Z2kAHTbmg= -github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb/go.mod h1:2itxkVYz1s+dKsfJZqKcFD2MGUoDxvyy/Yqjd7vkdjg= +github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856 h1:CJ2alESXl+dW4hQwO1vY5ingIDr6dGRB+8WodZeh24g= +github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856/go.mod h1:2itxkVYz1s+dKsfJZqKcFD2MGUoDxvyy/Yqjd7vkdjg= github.com/tikv/pd/client v0.0.0-20260323032024-d7b638033a14 h1:TVSx20m6DZMSiI37Dduu9RZb8yUvT1sgW8kCLAe+T5U= github.com/tikv/pd/client v0.0.0-20260323032024-d7b638033a14/go.mod h1:4kxXuAQAREpH+lVbydVwGNNDmcwdj0RG4Ofwky08W/k= github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 h1:9LPGD+jzxMlnk5r6+hJnar67cgpDIz/iyD+rfl5r2Vk= diff --git a/pkg/distsql/select_result.go b/pkg/distsql/select_result.go index 054a91476c169..8a6cc4f3690f7 100644 --- a/pkg/distsql/select_result.go +++ b/pkg/distsql/select_result.go @@ -643,9 +643,14 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr } } - if ruDetailsRaw := ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil && r.storeType == kv.TiFlash { - if err = execdetails.MergeTiFlashRUConsumption(r.selectResp.GetExecutionSummaries(), ruDetailsRaw.(*clientutil.RUDetails)); err != nil { - return err + if r.storeType == kv.TiFlash { + ruv2Metrics := execdetails.RUV2MetricsFromContext(ctx) + if ruv2Metrics == nil || !ruv2Metrics.Bypass() { + if ruDetailsRaw := ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil { + if err = execdetails.MergeTiFlashRUConsumption(r.selectResp.GetExecutionSummaries(), ruDetailsRaw.(*clientutil.RUDetails)); err != nil { + return err + } + } } } if copStats.TimeDetail.ProcessTime > 0 { diff --git a/pkg/executor/adapter.go b/pkg/executor/adapter.go index 698f9e5ceb3b0..4dd26ea9cd8b9 100644 --- a/pkg/executor/adapter.go +++ b/pkg/executor/adapter.go @@ -1708,7 +1708,7 @@ func recordInsertRows2Metrics(sessVars *variable.SessionVars) { func (a *ExecStmt) finalizeStatementRUV2Metrics() { sessVars := a.Ctx.GetSessionVars() - if sessVars.RUV2Metrics == nil { + if sessVars.RUV2Metrics == nil || sessVars.RUV2Metrics.Bypass() { return } diff --git a/pkg/executor/adapter_test.go b/pkg/executor/adapter_test.go index 7216446828e96..fe233070f312f 100644 --- a/pkg/executor/adapter_test.go +++ b/pkg/executor/adapter_test.go @@ -533,6 +533,42 @@ func TestFinishExecuteStmtReportsTiDBRUV2WithoutSyncingRUDetails(t *testing.T) { "update `stmt_summary_retry`%", ).Check(testkit.Rows("0 0")) }) + + t.Run("bypass ru skips final reporting", func(t *testing.T) { + reporter := &mockRUV2ConsumptionReporter{} + ctx := &mockRUV2ReportingContext{ + Context: mock.NewContext(), + reporter: reporter, + } + sessVars := ctx.GetSessionVars() + sessVars.StartTime = time.Now() + sessVars.StmtCtx.StmtType = "Select" + sessVars.StmtCtx.OriginalSQL = "select 1" + sessVars.StmtCtx.ResetSQLDigest(sessVars.StmtCtx.OriginalSQL) + sessVars.StmtCtx.ResourceGroupName = "rg1" + + goCtx := execdetails.ContextWithInitializedExecDetails(context.Background()) + sessVars.RUV2Metrics = execdetails.RUV2MetricsFromContext(goCtx) + require.NotNil(t, sessVars.RUV2Metrics) + sessVars.RUV2Metrics.SetBypass(true) + sessVars.RUV2Metrics.AddResultChunkCells(100) + + ruDetails := goCtx.Value(util.RUDetailsCtxKey).(*util.RUDetails) + ruDetails.AddTiKVRUV2(12345) + ruDetails.UpdateTiFlash(&rmpb.Consumption{RRU: 10, WRU: 20}) + + execStmt := &executor.ExecStmt{ + Ctx: ctx, + GoCtx: goCtx, + StmtNode: &ast.SelectStmt{}, + } + execStmt.FinishExecuteStmt(0, nil, false) + + require.Empty(t, reporter.group) + require.Zero(t, reporter.tikvRUV2) + require.Zero(t, reporter.tidbRUV2) + require.Zero(t, reporter.tiflashRU) + }) } func TestSlowLogMaxPerSec(t *testing.T) { diff --git a/pkg/executor/internal/mpp/local_mpp_coordinator.go b/pkg/executor/internal/mpp/local_mpp_coordinator.go index 0b39d76c4f6a1..df4dbc154c71c 100644 --- a/pkg/executor/internal/mpp/local_mpp_coordinator.go +++ b/pkg/executor/internal/mpp/local_mpp_coordinator.go @@ -784,9 +784,12 @@ func (c *localMppCoordinator) handleAllReports() error { RecordOneCopTask(-1, kv.TiFlash, detail)] = 0 } } - if ruDetailsRaw := c.ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil { - if err := execdetails.MergeTiFlashRUConsumption(report.executionSummaries, ruDetailsRaw.(*clientutil.RUDetails)); err != nil { - return err + ruv2Metrics := execdetails.RUV2MetricsFromContext(c.ctx) + if ruv2Metrics == nil || !ruv2Metrics.Bypass() { + if ruDetailsRaw := c.ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil { + if err := execdetails.MergeTiFlashRUConsumption(report.executionSummaries, ruDetailsRaw.(*clientutil.RUDetails)); err != nil { + return err + } } } } diff --git a/pkg/server/conn_stmt_test.go b/pkg/server/conn_stmt_test.go index caf207de153e7..8bc927d6cb708 100644 --- a/pkg/server/conn_stmt_test.go +++ b/pkg/server/conn_stmt_test.go @@ -240,6 +240,22 @@ func TestCursorWithParams(t *testing.T) { require.Equal(t, float64(13), reporter.tiflashRU) }) + t.Run("cursor ruv2 bypass skips tracker creation", func(t *testing.T) { + reporter := &mockCursorRUV2ConsumptionReporter{} + goCtx := execdetails.ContextWithInitializedExecDetails(context.Background()) + ruv2Metrics := execdetails.RUV2MetricsFromContext(goCtx) + require.NotNil(t, ruv2Metrics) + ruv2Metrics.SetBypass(true) + ruDetails := goCtx.Value(clientutil.RUDetailsCtxKey).(*clientutil.RUDetails) + ruDetails.AddTiKVRUV2(11) + tracker := resultset.NewCursorRUV2Tracker(reporter, "rg1", ruv2Metrics, ruDetails, execdetails.RUV2Weights{}) + require.Nil(t, tracker) + require.Empty(t, reporter.group) + require.Zero(t, reporter.tikvRUV2) + require.Zero(t, reporter.tidbRUV2) + require.Zero(t, reporter.tiflashRU) + }) + t.Run("write chunks skips column access on first next error", func(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) srv := CreateMockServer(t, store) diff --git a/pkg/server/internal/resultset/resultset.go b/pkg/server/internal/resultset/resultset.go index 4664d8b93f6e3..9276aae2245d6 100644 --- a/pkg/server/internal/resultset/resultset.go +++ b/pkg/server/internal/resultset/resultset.go @@ -90,6 +90,9 @@ func NewCursorRUV2Tracker( if metrics == nil && ruDetails == nil { return nil } + if metrics != nil && metrics.Bypass() { + return nil + } tracker := &CursorRUV2Tracker{ reporter: reporter, resourceGroupName: resourceGroupName, diff --git a/pkg/session/BUILD.bazel b/pkg/session/BUILD.bazel index 49070d0b8a94f..8ad1c907d87cb 100644 --- a/pkg/session/BUILD.bazel +++ b/pkg/session/BUILD.bazel @@ -201,6 +201,7 @@ go_test( "//pkg/util", "//pkg/util/benchdaily", "//pkg/util/chunk", + "//pkg/util/execdetails", "//pkg/util/logutil", "//pkg/util/sqlexec", "@com_github_pingcap_errors//:errors", diff --git a/pkg/session/session.go b/pkg/session/session.go index 2f091811376a0..f582881dc7236 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -1127,15 +1127,19 @@ func (*session) isTxnRetryableError(err error) bool { } func isEndTxnStmt(stmt ast.StmtNode, vars *variable.SessionVars) (bool, error) { - switch n := stmt.(type) { + resolvedStmt, err := resolvePreparedStmt(stmt, vars) + if err != nil { + return false, err + } + if resolvedStmt == nil { + return false, nil + } + if resolvedStmt != stmt { + return isEndTxnStmt(resolvedStmt, vars) + } + switch resolvedStmt.(type) { case *ast.RollbackStmt, *ast.CommitStmt: return true, nil - case *ast.ExecuteStmt: - ps, err := plannercore.GetPreparedStmt(n, vars) - if err != nil { - return false, err - } - return isEndTxnStmt(ps.PreparedAst.Stmt, vars) } return false, nil } @@ -2228,6 +2232,7 @@ func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, fu prevSQL := s.sessionVars.StmtCtx.OriginalSQL prevStmtType := s.sessionVars.StmtCtx.StmtType prevTables := s.sessionVars.StmtCtx.Tables + prevRUV2Metrics := s.sessionVars.RUV2Metrics return s, func() { s.sessionVars.AnalyzeVersion = prevStatsVer s.sessionVars.EnableAnalyzeSnapshot = prevAnalyzeSnapshot @@ -2240,6 +2245,7 @@ func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, fu s.sessionVars.StmtCtx.OriginalSQL = prevSQL s.sessionVars.StmtCtx.StmtType = prevStmtType s.sessionVars.StmtCtx.Tables = prevTables + s.sessionVars.RUV2Metrics = prevRUV2Metrics s.sessionVars.MemTracker.Detach() }, nil } @@ -2441,6 +2447,10 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s ctx = context.WithValue(ctx, execdetails.RUV2MetricsCtxKey, ruv2Metrics) } sessVars.RUV2Metrics = ruv2Metrics + bypass := shouldBypass(ctx, stmtNode, sessVars) + if ruv2Metrics != nil { + ruv2Metrics.SetBypass(bypass) + } if pending := sessVars.RUV2PendingSessionParserTotal.Swap(0); pending > 0 && ruv2Metrics != nil { ruv2Metrics.AddSessionParserTotal(pending) } @@ -2489,11 +2499,8 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s }) var stmtLabel string - if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok { - prepareStmt, err := plannercore.GetPreparedStmt(execStmt, s.sessionVars) - if err == nil && prepareStmt.PreparedAst != nil { - stmtLabel = stmtctx.GetStmtLabel(ctx, prepareStmt.PreparedAst.Stmt) - } + if resolvedStmt, err := resolvePreparedStmt(stmtNode, s.sessionVars); err == nil && resolvedStmt != nil { + stmtLabel = stmtctx.GetStmtLabel(ctx, resolvedStmt) } if stmtLabel == "" { stmtLabel = stmtctx.GetStmtLabel(ctx, stmtNode) @@ -2728,6 +2735,52 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s return recordSet, nil } +var isNextGenForRUV2 = kerneltype.IsNextGen + +func resolvePreparedStmt(stmt ast.StmtNode, vars *variable.SessionVars) (ast.StmtNode, error) { + if stmt == nil { + return nil, nil + } + execStmt, ok := stmt.(*ast.ExecuteStmt) + if !ok { + return stmt, nil + } + if vars == nil { + return nil, nil + } + prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars) + if err != nil { + return nil, err + } + if prepareStmt == nil || prepareStmt.PreparedAst == nil { + return nil, nil + } + return prepareStmt.PreparedAst.Stmt, nil +} + +func shouldBypass(ctx context.Context, stmtNode ast.StmtNode, sessVars *variable.SessionVars) bool { + switch kv.GetInternalSourceType(ctx) { + case kv.InternalTxnOthers: + return true + case kv.InternalTxnStats: + return isNextGenForRUV2() && isAnalyzeStatementForRUV2(stmtNode, sessVars) + default: + return false + } +} + +func isAnalyzeStatementForRUV2(stmtNode ast.StmtNode, sessVars *variable.SessionVars) bool { + if stmtNode == nil || sessVars == nil { + return false + } + resolvedStmt, err := resolvePreparedStmt(stmtNode, sessVars) + if err != nil || resolvedStmt == nil { + return false + } + _, ok := resolvedStmt.(*ast.AnalyzeTableStmt) + return ok +} + func (s *session) GetSQLExecutor() sqlexec.SQLExecutor { return s } @@ -4765,22 +4818,13 @@ func (s *session) shouldUsePessimisticAutoCommit(stmtNode ast.StmtNode) bool { } // isDMLStatement checks if the given statement should use pessimistic-auto-commit. -// It handles EXECUTE unwrapping and properly handles EXPLAIN statements by checking their inner statement. +// It unwraps EXECUTE statements and properly handles EXPLAIN statements by checking their inner statement. func (s *session) isDMLStatement(stmtNode ast.StmtNode) bool { - if stmtNode == nil { + actualStmt, err := resolvePreparedStmt(stmtNode, s.GetSessionVars()) + if err != nil || actualStmt == nil { return false } - // Handle EXECUTE statements - unwrap to get the actual prepared statement - actualStmt := stmtNode - if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok { - prepareStmt, err := plannercore.GetPreparedStmt(execStmt, s.GetSessionVars()) - if err != nil || prepareStmt == nil { - return false - } - actualStmt = prepareStmt.PreparedAst.Stmt - } - // For EXPLAIN statements, check the underlying statement // This ensures EXPLAIN shows the correct plan that would be used if the statement were executed if explainStmt, ok := actualStmt.(*ast.ExplainStmt); ok { diff --git a/pkg/session/tidb_test.go b/pkg/session/tidb_test.go index 0982215f73d5a..413c99107dd14 100644 --- a/pkg/session/tidb_test.go +++ b/pkg/session/tidb_test.go @@ -25,6 +25,8 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/execdetails" + "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/stretchr/testify/require" ) @@ -73,23 +75,80 @@ func TestRUV2SessionParserTotalDoesNotLeakAcrossStandaloneParse(t *testing.T) { se, err := createSession(store) require.NoError(t, err) - _, err = se.ParseWithParams(context.Background(), "select 1") - require.NoError(t, err) - require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load()) + t.Run("standalone parse carries into next statement only once", func(t *testing.T) { + _, err = se.ParseWithParams(context.Background(), "select 1") + require.NoError(t, err) + require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load()) - stmt, err := se.ParseWithParams(context.Background(), "set @a=1") - require.NoError(t, err) - require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load()) + stmt, err := se.ParseWithParams(context.Background(), "set @a=1") + require.NoError(t, err) + require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load()) - _, err = se.ExecuteStmt(context.Background(), stmt) - require.NoError(t, err) - require.Zero(t, se.sessionVars.RUV2PendingSessionParserTotal.Load()) - require.NotNil(t, se.sessionVars.RUV2Metrics) - require.Equal(t, int64(1), se.sessionVars.RUV2Metrics.SessionParserTotal()) + _, err = se.ExecuteStmt(context.Background(), stmt) + require.NoError(t, err) + require.Zero(t, se.sessionVars.RUV2PendingSessionParserTotal.Load()) + require.NotNil(t, se.sessionVars.RUV2Metrics) + require.Equal(t, int64(1), se.sessionVars.RUV2Metrics.SessionParserTotal()) - dctx := se.GetDistSQLCtx() - require.Equal(t, se.sessionVars.RUV2Metrics, dctx.RUV2Metrics) - require.NotNil(t, dctx.RUV2RPCInterceptor) + dctx := se.GetDistSQLCtx() + require.Same(t, se.sessionVars.RUV2Metrics, dctx.RUV2Metrics) + require.NotNil(t, dctx.RUV2RPCInterceptor) + }) + + t.Run("internal others bypass skips parser ru accounting", func(t *testing.T) { + stmt, err := se.ParseWithParams(context.Background(), "set @b=1") + require.NoError(t, err) + require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load()) + + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) + _, err = se.ExecuteStmt(ctx, stmt) + require.NoError(t, err) + require.Zero(t, se.sessionVars.RUV2PendingSessionParserTotal.Load()) + require.NotNil(t, se.sessionVars.RUV2Metrics) + require.True(t, se.sessionVars.RUV2Metrics.Bypass()) + require.Zero(t, se.sessionVars.RUV2Metrics.SessionParserTotal()) + }) + + t.Run("statement bypass decision follows internal analyze semantics", func(t *testing.T) { + statsCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + origIsNextGenForRUV2 := isNextGenForRUV2 + defer func() { + isNextGenForRUV2 = origIsNextGenForRUV2 + }() + + MustExec(t, se, "use test") + MustExec(t, se, "drop table if exists bypass_prepare") + MustExec(t, se, "create table bypass_prepare (a int)") + + stmtID, _, _, err := se.PrepareStmt("analyze table bypass_prepare") + require.NoError(t, err) + prepStmt, err := se.GetSessionVars().GetPreparedStmtByID(stmtID) + require.NoError(t, err) + execAnalyzeStmt := &ast.ExecuteStmt{PrepStmt: prepStmt} + + isNextGenForRUV2 = func() bool { return true } + require.True(t, shouldBypass(statsCtx, &ast.AnalyzeTableStmt{}, se.sessionVars)) + require.True(t, shouldBypass(statsCtx, execAnalyzeStmt, se.sessionVars)) + + isNextGenForRUV2 = func() bool { return false } + require.False(t, shouldBypass(statsCtx, &ast.AnalyzeTableStmt{}, se.sessionVars)) + require.False(t, shouldBypass(statsCtx, execAnalyzeStmt, se.sessionVars)) + require.False(t, shouldBypass(statsCtx, &ast.SelectStmt{}, se.sessionVars)) + }) + + t.Run("current-session restricted sql restores outer ruv2 metrics", func(t *testing.T) { + outerCtx := execdetails.ContextWithInitializedExecDetails(context.Background()) + outerMetrics := execdetails.RUV2MetricsFromContext(outerCtx) + require.NotNil(t, outerMetrics) + se.sessionVars.RUV2Metrics = outerMetrics + + internalCtx := kv.WithInternalSourceType(outerCtx, kv.InternalTxnOthers) + _, _, err := se.ExecRestrictedSQL(internalCtx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "select 1") + require.NoError(t, err) + + require.Same(t, outerMetrics, se.sessionVars.RUV2Metrics) + require.False(t, outerMetrics.Bypass()) + }) } func TestCrossKSSessionDistSQLCtxDoesNotExposeTypedNilRUReporter(t *testing.T) { diff --git a/pkg/store/driver/txn/ruv2_metrics_test.go b/pkg/store/driver/txn/ruv2_metrics_test.go index 7062f20ada437..d36d1c833a46f 100644 --- a/pkg/store/driver/txn/ruv2_metrics_test.go +++ b/pkg/store/driver/txn/ruv2_metrics_test.go @@ -59,6 +59,27 @@ func TestStatementRUV2RPCInterceptor(t *testing.T) { require.Equal(t, int64(1), ruv2Metrics.ResourceManagerWriteCnt()) require.Equal(t, int64(9), ruv2Metrics.TiKVStorageProcessedKeysBatchGet()) require.Equal(t, int64(1), ruv2Metrics.TiKVStorageProcessedKeysGet()) + + t.Run("bypass ru skips interceptor accounting", func(t *testing.T) { + bypassed := execdetails.NewRUV2Metrics() + bypassed.SetBypass(true) + it := NewStatementRUV2RPCInterceptor(bypassed) + require.NotNil(t, it) + + wrapFn := it.Wrap(func(_ string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return &tikvrpc.Response{ + Resp: &kvrpcpb.BatchGetResponse{ + ExecDetailsV2: &kvrpcpb.ExecDetailsV2{ + RuV2: &kvrpcpb.RUV2{StorageProcessedKeysBatchGet: 4}, + }, + }, + }, nil + }) + _, err := wrapFn("tikv-1", &tikvrpc.Request{Type: tikvrpc.CmdBatchGet, StoreTp: tikvrpc.TiKV}) + require.NoError(t, err) + require.Zero(t, bypassed.ResourceManagerReadCnt()) + require.Zero(t, bypassed.TiKVStorageProcessedKeysBatchGet()) + }) } func TestStatementRUV2RPCInterceptorNilMetrics(t *testing.T) { diff --git a/pkg/util/execdetails/execdetails_test.go b/pkg/util/execdetails/execdetails_test.go index aa59c1ed75b4c..cfe92ca597dee 100644 --- a/pkg/util/execdetails/execdetails_test.go +++ b/pkg/util/execdetails/execdetails_test.go @@ -376,6 +376,24 @@ func TestRUV2MetricsSnapshotCalculateRUValues(t *testing.T) { require.Zero(t, metrics.CalculateRUValues(zeroScaleWeights)) require.Equal(t, tikvRU+tiflashRU, metrics.TotalRU(zeroScaleWeights, tikvRU, tiflashRU)) }) + + t.Run("bypass keeps total zero", func(t *testing.T) { + bypassed := NewRUV2Metrics() + bypassed.SetBypass(true) + bypassed.AddResultChunkCells(1000) + bypassed.AddPlanCnt(2) + + require.Zero(t, bypassed.CalculateRUValues(weights)) + require.Zero(t, bypassed.TotalRU(weights, tikvRU, tiflashRU)) + total, detail := FormatRUV2Summary(bypassed, weights, tikvRU, tiflashRU) + require.Empty(t, total) + require.Empty(t, detail) + }) + + t.Run("nil metrics keep tikv and tiflash ru", func(t *testing.T) { + var nilMetrics *RUV2Metrics + require.Equal(t, tikvRU+tiflashRU, nilMetrics.TotalRU(weights, tikvRU, tiflashRU)) + }) } func TestRUV2MetricsSnapshotFreezesRUValues(t *testing.T) { diff --git a/pkg/util/execdetails/ruv2_metrics.go b/pkg/util/execdetails/ruv2_metrics.go index 85d52e9ef443b..30bb3d057c411 100644 --- a/pkg/util/execdetails/ruv2_metrics.go +++ b/pkg/util/execdetails/ruv2_metrics.go @@ -60,6 +60,8 @@ func RUV2MetricsFromContext(ctx context.Context) *RUV2Metrics { // RUV2Metrics stores statement-level RUv2 metrics. type RUV2Metrics struct { + bypass atomic.Bool + resultChunkCells int64 executorL1 sync.Map @@ -89,15 +91,28 @@ func NewRUV2Metrics() *RUV2Metrics { return &RUV2Metrics{} } +// SetBypass marks whether statement-level RU accounting should be skipped. +func (m *RUV2Metrics) SetBypass(enabled bool) { + m.bypass.Store(enabled) +} + +// Bypass returns whether statement-level RU accounting should be skipped. +func (m *RUV2Metrics) Bypass() bool { + return m.bypass.Load() +} + // AddResultChunkCells records result cells written by the current statement. func (m *RUV2Metrics) AddResultChunkCells(delta int64) { + if m.Bypass() { + return + } metrics.RUV2ResultChunkCells.Add(float64(delta)) atomic.AddInt64(&m.resultChunkCells, delta) } // AddExecutorMetric records a statement-level executor metric for the given RUv2 level. func (m *RUV2Metrics) AddExecutorMetric(level int, label string, delta int64) { - if delta == 0 || label == "" { + if m.Bypass() || delta == 0 || label == "" { return } switch level { @@ -120,85 +135,124 @@ func (m *RUV2Metrics) AddExecutorMetric(level int, label string, delta int64) { // AddExecutorL5InsertRows records affected insert rows for RUv2 accounting. func (m *RUV2Metrics) AddExecutorL5InsertRows(delta int64) { + if m.Bypass() { + return + } metrics.RUV2ExecutorL5InsertRows.Add(float64(delta)) atomic.AddInt64(&m.executorL5InsertRows, delta) } // AddPlanCnt records plan builder invocations for the current statement. func (m *RUV2Metrics) AddPlanCnt(delta int64) { + if m.Bypass() { + return + } metrics.RUV2PlanCnt.Add(float64(delta)) atomic.AddInt64(&m.planCnt, delta) } // AddPlanDeriveStatsPaths records derived stats paths for the current statement. func (m *RUV2Metrics) AddPlanDeriveStatsPaths(delta int64) { + if m.Bypass() { + return + } metrics.RUV2PlanDeriveStatsPaths.Add(float64(delta)) atomic.AddInt64(&m.planDeriveStatsPaths, delta) } // AddSessionParserTotal records parser executions for the current statement. func (m *RUV2Metrics) AddSessionParserTotal(delta int64) { + if m.Bypass() { + return + } metrics.RUV2SessionParserTotal.Add(float64(delta)) atomic.AddInt64(&m.sessionParserTotal, delta) } // AddTxnCnt records transaction completions attributed to the current statement. func (m *RUV2Metrics) AddTxnCnt(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TxnCnt.Add(float64(delta)) atomic.AddInt64(&m.txnCnt, delta) } // AddResourceManagerReadCnt records TiKV read RPCs charged to resource management. func (m *RUV2Metrics) AddResourceManagerReadCnt(delta int64) { + if m.Bypass() { + return + } metrics.RUV2ResourceManagerReadCnt.Add(float64(delta)) atomic.AddInt64(&m.resourceManagerReadCnt, delta) } // AddResourceManagerWriteCnt records TiKV write RPCs charged to resource management. func (m *RUV2Metrics) AddResourceManagerWriteCnt(delta int64) { + if m.Bypass() { + return + } metrics.RUV2ResourceManagerWriteCnt.Add(float64(delta)) atomic.AddInt64(&m.resourceManagerWriteCnt, delta) } // AddTiKVKVEngineCacheMiss records TiKV kv_engine_cache_miss counters from ExecDetailsV2. func (m *RUV2Metrics) AddTiKVKVEngineCacheMiss(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TiKVKVEngineCacheMiss.Add(float64(delta)) atomic.AddInt64(&m.tikvKvEngineCacheMiss, delta) } // AddTiKVCoprocessorExecutorIterations records TiKV coprocessor iteration counters. func (m *RUV2Metrics) AddTiKVCoprocessorExecutorIterations(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TiKVCoprocessorExecutorIterations.Add(float64(delta)) atomic.AddInt64(&m.tikvCoprocessorExecutorIterations, delta) } // AddTiKVCoprocessorResponseBytes records TiKV coprocessor response bytes. func (m *RUV2Metrics) AddTiKVCoprocessorResponseBytes(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TiKVCoprocessorResponseBytes.Add(float64(delta)) atomic.AddInt64(&m.tikvCoprocessorResponseBytes, delta) } // AddTiKVRaftstoreStoreWriteTriggerWB records TiKV raftstore write trigger bytes. func (m *RUV2Metrics) AddTiKVRaftstoreStoreWriteTriggerWB(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TiKVRaftstoreStoreWriteTriggerWB.Add(float64(delta)) atomic.AddInt64(&m.tikvRaftstoreStoreWriteTriggerWB, delta) } // AddTiKVStorageProcessedKeysBatchGet records TiKV batch-get processed keys. func (m *RUV2Metrics) AddTiKVStorageProcessedKeysBatchGet(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TiKVStorageProcessedKeysBatchGet.Add(float64(delta)) atomic.AddInt64(&m.tikvStorageProcessedKeysBatchGet, delta) } // AddTiKVStorageProcessedKeysGet records TiKV get processed keys. func (m *RUV2Metrics) AddTiKVStorageProcessedKeysGet(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TiKVStorageProcessedKeysGet.Add(float64(delta)) atomic.AddInt64(&m.tikvStorageProcessedKeysGet, delta) } // AddTiKVCoprocessorWorkTotal records TiKV executor input counters by executor type. func (m *RUV2Metrics) AddTiKVCoprocessorWorkTotal(label string, delta int64) { - if delta == 0 || label == "" { + if m.Bypass() || delta == 0 || label == "" { return } metrics.RUV2TiKVCoprocessorWorkTotal.WithLabelValues(label).Add(float64(delta)) @@ -211,6 +265,7 @@ func (m *RUV2Metrics) Clone() *RUV2Metrics { return nil } cloned := &RUV2Metrics{} + cloned.bypass.Store(m.Bypass()) atomic.StoreInt64(&cloned.resultChunkCells, atomic.LoadInt64(&m.resultChunkCells)) cloneRUV2LabelCounter(&cloned.executorL1, &m.executorL1) cloneRUV2LabelCounter(&cloned.executorL2, &m.executorL2) @@ -295,6 +350,9 @@ func (m *RUV2Metrics) Merge(other *RUV2Metrics) { if m == nil || other == nil { return } + if m.Bypass() || other.Bypass() { + return + } atomic.AddInt64(&m.resultChunkCells, other.ResultChunkCells()) mergeIntoRUV2LabelCounter(&m.executorL1, &other.executorL1) mergeIntoRUV2LabelCounter(&m.executorL2, &other.executorL2) @@ -436,7 +494,7 @@ func (m *RUV2Metrics) TiKVStorageProcessedKeysGet() int64 { // IsZero checks whether all metrics are zero. func (m *RUV2Metrics) IsZero() bool { - if m == nil { + if m == nil || m.Bypass() { return true } return m.ResultChunkCells() == 0 && @@ -463,7 +521,7 @@ func (m *RUV2Metrics) IsZero() bool { // provided weights. The weights specify how each component is weighted in the // RU calculation. Returns the calculated TiDB RU as a float64. func (m *RUV2Metrics) CalculateRUValues(weights RUV2Weights) (tidbRU float64) { - if m == nil { + if m == nil || m.Bypass() { return 0 } return m.calculateRUValuesWithWeights(weights) @@ -471,6 +529,12 @@ func (m *RUV2Metrics) CalculateRUValues(weights RUV2Weights) (tidbRU float64) { // TotalRU returns the statement RU v2 total as TiDB + TiKV + TiFlash. func (m *RUV2Metrics) TotalRU(weights RUV2Weights, tiKVRU, tiFlashRU float64) float64 { + if m == nil { + return tiKVRU + tiFlashRU + } + if m.Bypass() { + return 0 + } return m.CalculateRUValues(weights) + tiKVRU + tiFlashRU } @@ -504,6 +568,9 @@ func sumRUV2LabelMap(values map[string]int64) int64 { // FormatRUV2Summary formats the RUv2 total and detailed metrics in one pass. func FormatRUV2Summary(metrics *RUV2Metrics, weights RUV2Weights, tiKVRU, tiFlashRU float64) (total string, detail string) { + if metrics != nil && metrics.Bypass() { + return "", "" + } var ( resultChunkCells int64 executorL1 map[string]int64