From 77702df09249eb7cd1ee3d3e92c7c96e96f84301 Mon Sep 17 00:00:00 2001 From: disksing Date: Sat, 28 Mar 2026 11:49:50 +0800 Subject: [PATCH 1/8] executor, session, server, store, util: bypass ruv2 for v1 bypass requests Signed-off-by: disksing --- pkg/distsql/select_result.go | 11 ++- pkg/executor/adapter.go | 2 +- pkg/executor/adapter_test.go | 37 ++++++++++ .../internal/mpp/local_mpp_coordinator.go | 9 ++- pkg/server/conn_stmt_test.go | 16 +++++ pkg/server/internal/resultset/resultset.go | 3 + pkg/session/session.go | 35 +++++++++ pkg/session/tidb_test.go | 52 ++++++++++---- pkg/sessionctx/stmtctx/stmtctx.go | 3 + pkg/store/driver/txn/ruv2_metrics_test.go | 21 ++++++ pkg/util/execdetails/execdetails_test.go | 13 ++++ pkg/util/execdetails/ruv2_metrics.go | 72 +++++++++++++++++-- 12 files changed, 249 insertions(+), 25 deletions(-) diff --git a/pkg/distsql/select_result.go b/pkg/distsql/select_result.go index 054a91476c169..8a6cc4f3690f7 100644 --- a/pkg/distsql/select_result.go +++ b/pkg/distsql/select_result.go @@ -643,9 +643,14 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr } } - if ruDetailsRaw := ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil && r.storeType == kv.TiFlash { - if err = execdetails.MergeTiFlashRUConsumption(r.selectResp.GetExecutionSummaries(), ruDetailsRaw.(*clientutil.RUDetails)); err != nil { - return err + if r.storeType == kv.TiFlash { + ruv2Metrics := execdetails.RUV2MetricsFromContext(ctx) + if ruv2Metrics == nil || !ruv2Metrics.Bypass() { + if ruDetailsRaw := ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil { + if err = execdetails.MergeTiFlashRUConsumption(r.selectResp.GetExecutionSummaries(), ruDetailsRaw.(*clientutil.RUDetails)); err != nil { + return err + } + } } } if copStats.TimeDetail.ProcessTime > 0 { diff --git a/pkg/executor/adapter.go b/pkg/executor/adapter.go index 698f9e5ceb3b0..4dd26ea9cd8b9 100644 --- a/pkg/executor/adapter.go +++ b/pkg/executor/adapter.go @@ -1708,7 +1708,7 @@ func recordInsertRows2Metrics(sessVars *variable.SessionVars) { func (a *ExecStmt) finalizeStatementRUV2Metrics() { sessVars := a.Ctx.GetSessionVars() - if sessVars.RUV2Metrics == nil { + if sessVars.RUV2Metrics == nil || sessVars.RUV2Metrics.Bypass() { return } diff --git a/pkg/executor/adapter_test.go b/pkg/executor/adapter_test.go index 7216446828e96..9a970e6baa9b0 100644 --- a/pkg/executor/adapter_test.go +++ b/pkg/executor/adapter_test.go @@ -533,6 +533,43 @@ func TestFinishExecuteStmtReportsTiDBRUV2WithoutSyncingRUDetails(t *testing.T) { "update `stmt_summary_retry`%", ).Check(testkit.Rows("0 0")) }) + + t.Run("bypass ru skips final reporting", func(t *testing.T) { + reporter := &mockRUV2ConsumptionReporter{} + ctx := &mockRUV2ReportingContext{ + Context: mock.NewContext(), + reporter: reporter, + } + sessVars := ctx.GetSessionVars() + sessVars.StartTime = time.Now() + sessVars.StmtCtx.StmtType = "Select" + sessVars.StmtCtx.OriginalSQL = "select 1" + sessVars.StmtCtx.ResetSQLDigest(sessVars.StmtCtx.OriginalSQL) + sessVars.StmtCtx.ResourceGroupName = "rg1" + sessVars.StmtCtx.BypassRU = true + + goCtx := execdetails.ContextWithInitializedExecDetails(context.Background()) + sessVars.RUV2Metrics = execdetails.RUV2MetricsFromContext(goCtx) + require.NotNil(t, sessVars.RUV2Metrics) + sessVars.RUV2Metrics.SetBypass(true) + sessVars.RUV2Metrics.AddResultChunkCells(100) + + ruDetails := goCtx.Value(util.RUDetailsCtxKey).(*util.RUDetails) + ruDetails.AddTiKVRUV2(12345) + ruDetails.UpdateTiFlash(&rmpb.Consumption{RRU: 10, WRU: 20}) + + execStmt := &executor.ExecStmt{ + Ctx: ctx, + GoCtx: goCtx, + StmtNode: &ast.SelectStmt{}, + } + execStmt.FinishExecuteStmt(0, nil, false) + + require.Empty(t, reporter.group) + require.Zero(t, reporter.tikvRUV2) + require.Zero(t, reporter.tidbRUV2) + require.Zero(t, reporter.tiflashRU) + }) } func TestSlowLogMaxPerSec(t *testing.T) { diff --git a/pkg/executor/internal/mpp/local_mpp_coordinator.go b/pkg/executor/internal/mpp/local_mpp_coordinator.go index 0b39d76c4f6a1..9bae77f8771d8 100644 --- a/pkg/executor/internal/mpp/local_mpp_coordinator.go +++ b/pkg/executor/internal/mpp/local_mpp_coordinator.go @@ -784,9 +784,12 @@ func (c *localMppCoordinator) handleAllReports() error { RecordOneCopTask(-1, kv.TiFlash, detail)] = 0 } } - if ruDetailsRaw := c.ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil { - if err := execdetails.MergeTiFlashRUConsumption(report.executionSummaries, ruDetailsRaw.(*clientutil.RUDetails)); err != nil { - return err + ruv2Metrics := c.sessionCtx.GetSessionVars().RUV2Metrics + if ruv2Metrics == nil || !ruv2Metrics.Bypass() { + if ruDetailsRaw := c.ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil { + if err := execdetails.MergeTiFlashRUConsumption(report.executionSummaries, ruDetailsRaw.(*clientutil.RUDetails)); err != nil { + return err + } } } } diff --git a/pkg/server/conn_stmt_test.go b/pkg/server/conn_stmt_test.go index caf207de153e7..8bc927d6cb708 100644 --- a/pkg/server/conn_stmt_test.go +++ b/pkg/server/conn_stmt_test.go @@ -240,6 +240,22 @@ func TestCursorWithParams(t *testing.T) { require.Equal(t, float64(13), reporter.tiflashRU) }) + t.Run("cursor ruv2 bypass skips tracker creation", func(t *testing.T) { + reporter := &mockCursorRUV2ConsumptionReporter{} + goCtx := execdetails.ContextWithInitializedExecDetails(context.Background()) + ruv2Metrics := execdetails.RUV2MetricsFromContext(goCtx) + require.NotNil(t, ruv2Metrics) + ruv2Metrics.SetBypass(true) + ruDetails := goCtx.Value(clientutil.RUDetailsCtxKey).(*clientutil.RUDetails) + ruDetails.AddTiKVRUV2(11) + tracker := resultset.NewCursorRUV2Tracker(reporter, "rg1", ruv2Metrics, ruDetails, execdetails.RUV2Weights{}) + require.Nil(t, tracker) + require.Empty(t, reporter.group) + require.Zero(t, reporter.tikvRUV2) + require.Zero(t, reporter.tidbRUV2) + require.Zero(t, reporter.tiflashRU) + }) + t.Run("write chunks skips column access on first next error", func(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) srv := CreateMockServer(t, store) diff --git a/pkg/server/internal/resultset/resultset.go b/pkg/server/internal/resultset/resultset.go index 4664d8b93f6e3..9276aae2245d6 100644 --- a/pkg/server/internal/resultset/resultset.go +++ b/pkg/server/internal/resultset/resultset.go @@ -90,6 +90,9 @@ func NewCursorRUV2Tracker( if metrics == nil && ruDetails == nil { return nil } + if metrics != nil && metrics.Bypass() { + return nil + } tracker := &CursorRUV2Tracker{ reporter: reporter, resourceGroupName: resourceGroupName, diff --git a/pkg/session/session.go b/pkg/session/session.go index 2f091811376a0..bd467cacecf7d 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2441,6 +2441,11 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s ctx = context.WithValue(ctx, execdetails.RUV2MetricsCtxKey, ruv2Metrics) } sessVars.RUV2Metrics = ruv2Metrics + bypass := shouldBypass(ctx, stmtNode, sessVars) + sessVars.StmtCtx.BypassRU = bypass + if ruv2Metrics != nil { + ruv2Metrics.SetBypass(bypass) + } if pending := sessVars.RUV2PendingSessionParserTotal.Swap(0); pending > 0 && ruv2Metrics != nil { ruv2Metrics.AddSessionParserTotal(pending) } @@ -2728,6 +2733,36 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s return recordSet, nil } +func shouldBypass(ctx context.Context, stmtNode ast.StmtNode, sessVars *variable.SessionVars) bool { + switch kv.GetInternalSourceType(ctx) { + case kv.InternalTxnOthers: + return true + case kv.InternalTxnStats: + return kerneltype.IsNextGen() && isAnalyzeStatementForRUV2(stmtNode, sessVars) + default: + return false + } +} + +func isAnalyzeStatementForRUV2(stmtNode ast.StmtNode, sessVars *variable.SessionVars) bool { + if stmtNode == nil || sessVars == nil { + return false + } + switch stmt := stmtNode.(type) { + case *ast.AnalyzeTableStmt: + return true + case *ast.ExecuteStmt: + prepareStmt, err := plannercore.GetPreparedStmt(stmt, sessVars) + if err != nil { + return false + } + _, ok := prepareStmt.PreparedAst.Stmt.(*ast.AnalyzeTableStmt) + return ok + default: + return false + } +} + func (s *session) GetSQLExecutor() sqlexec.SQLExecutor { return s } diff --git a/pkg/session/tidb_test.go b/pkg/session/tidb_test.go index 0982215f73d5a..a1080c58ea960 100644 --- a/pkg/session/tidb_test.go +++ b/pkg/session/tidb_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config/kerneltype" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/ast" @@ -73,23 +74,46 @@ func TestRUV2SessionParserTotalDoesNotLeakAcrossStandaloneParse(t *testing.T) { se, err := createSession(store) require.NoError(t, err) - _, err = se.ParseWithParams(context.Background(), "select 1") - require.NoError(t, err) - require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load()) + t.Run("standalone parse carries into next statement only once", func(t *testing.T) { + _, err = se.ParseWithParams(context.Background(), "select 1") + require.NoError(t, err) + require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load()) - stmt, err := se.ParseWithParams(context.Background(), "set @a=1") - require.NoError(t, err) - require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load()) + stmt, err := se.ParseWithParams(context.Background(), "set @a=1") + require.NoError(t, err) + require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load()) - _, err = se.ExecuteStmt(context.Background(), stmt) - require.NoError(t, err) - require.Zero(t, se.sessionVars.RUV2PendingSessionParserTotal.Load()) - require.NotNil(t, se.sessionVars.RUV2Metrics) - require.Equal(t, int64(1), se.sessionVars.RUV2Metrics.SessionParserTotal()) + _, err = se.ExecuteStmt(context.Background(), stmt) + require.NoError(t, err) + require.Zero(t, se.sessionVars.RUV2PendingSessionParserTotal.Load()) + require.NotNil(t, se.sessionVars.RUV2Metrics) + require.Equal(t, int64(1), se.sessionVars.RUV2Metrics.SessionParserTotal()) - dctx := se.GetDistSQLCtx() - require.Equal(t, se.sessionVars.RUV2Metrics, dctx.RUV2Metrics) - require.NotNil(t, dctx.RUV2RPCInterceptor) + dctx := se.GetDistSQLCtx() + require.Equal(t, se.sessionVars.RUV2Metrics, dctx.RUV2Metrics) + require.NotNil(t, dctx.RUV2RPCInterceptor) + }) + + t.Run("internal others bypass skips parser ru accounting", func(t *testing.T) { + stmt, err := se.ParseWithParams(context.Background(), "set @b=1") + require.NoError(t, err) + require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load()) + + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) + _, err = se.ExecuteStmt(ctx, stmt) + require.NoError(t, err) + require.Zero(t, se.sessionVars.RUV2PendingSessionParserTotal.Load()) + require.True(t, se.sessionVars.StmtCtx.BypassRU) + require.NotNil(t, se.sessionVars.RUV2Metrics) + require.True(t, se.sessionVars.RUV2Metrics.Bypass()) + require.Zero(t, se.sessionVars.RUV2Metrics.SessionParserTotal()) + }) + + t.Run("statement bypass decision follows internal analyze semantics", func(t *testing.T) { + statsCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + require.Equal(t, kerneltype.IsNextGen(), shouldBypass(statsCtx, &ast.AnalyzeTableStmt{}, se.sessionVars)) + require.False(t, shouldBypass(statsCtx, &ast.SelectStmt{}, se.sessionVars)) + }) } func TestCrossKSSessionDistSQLCtxDoesNotExposeTypedNilRUReporter(t *testing.T) { diff --git a/pkg/sessionctx/stmtctx/stmtctx.go b/pkg/sessionctx/stmtctx/stmtctx.go index 1284eddc81ea0..ac7dfa524da2e 100644 --- a/pkg/sessionctx/stmtctx/stmtctx.go +++ b/pkg/sessionctx/stmtctx/stmtctx.go @@ -482,6 +482,9 @@ type StatementContext struct { // IsExplainAnalyzeDML is true if the statement is "explain analyze DML executors", before responding the explain // results to the client, the transaction should be committed first. See issue #37373 for more details. IsExplainAnalyzeDML bool + // BypassRU marks statements whose RU accounting should be skipped to stay aligned with + // the existing v1 bypass semantics for internal low-impact and internal analyze requests. + BypassRU bool // InsertRowsAsRUV2Recorded tracks whether the statement-level insert-row RUv2 cost has already been // applied to RUV2Metrics. This must stay idempotent because EXPLAIN ANALYZE INSERT snapshots RU before // FinishExecuteStmt runs, while FinishExecuteStmt still needs to reuse the same accounting path for the diff --git a/pkg/store/driver/txn/ruv2_metrics_test.go b/pkg/store/driver/txn/ruv2_metrics_test.go index 7062f20ada437..d36d1c833a46f 100644 --- a/pkg/store/driver/txn/ruv2_metrics_test.go +++ b/pkg/store/driver/txn/ruv2_metrics_test.go @@ -59,6 +59,27 @@ func TestStatementRUV2RPCInterceptor(t *testing.T) { require.Equal(t, int64(1), ruv2Metrics.ResourceManagerWriteCnt()) require.Equal(t, int64(9), ruv2Metrics.TiKVStorageProcessedKeysBatchGet()) require.Equal(t, int64(1), ruv2Metrics.TiKVStorageProcessedKeysGet()) + + t.Run("bypass ru skips interceptor accounting", func(t *testing.T) { + bypassed := execdetails.NewRUV2Metrics() + bypassed.SetBypass(true) + it := NewStatementRUV2RPCInterceptor(bypassed) + require.NotNil(t, it) + + wrapFn := it.Wrap(func(_ string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return &tikvrpc.Response{ + Resp: &kvrpcpb.BatchGetResponse{ + ExecDetailsV2: &kvrpcpb.ExecDetailsV2{ + RuV2: &kvrpcpb.RUV2{StorageProcessedKeysBatchGet: 4}, + }, + }, + }, nil + }) + _, err := wrapFn("tikv-1", &tikvrpc.Request{Type: tikvrpc.CmdBatchGet, StoreTp: tikvrpc.TiKV}) + require.NoError(t, err) + require.Zero(t, bypassed.ResourceManagerReadCnt()) + require.Zero(t, bypassed.TiKVStorageProcessedKeysBatchGet()) + }) } func TestStatementRUV2RPCInterceptorNilMetrics(t *testing.T) { diff --git a/pkg/util/execdetails/execdetails_test.go b/pkg/util/execdetails/execdetails_test.go index aa59c1ed75b4c..3020bc7608423 100644 --- a/pkg/util/execdetails/execdetails_test.go +++ b/pkg/util/execdetails/execdetails_test.go @@ -376,6 +376,19 @@ func TestRUV2MetricsSnapshotCalculateRUValues(t *testing.T) { require.Zero(t, metrics.CalculateRUValues(zeroScaleWeights)) require.Equal(t, tikvRU+tiflashRU, metrics.TotalRU(zeroScaleWeights, tikvRU, tiflashRU)) }) + + t.Run("bypass keeps total zero", func(t *testing.T) { + bypassed := NewRUV2Metrics() + bypassed.SetBypass(true) + bypassed.AddResultChunkCells(1000) + bypassed.AddPlanCnt(2) + + require.Zero(t, bypassed.CalculateRUValues(weights)) + require.Zero(t, bypassed.TotalRU(weights, tikvRU, tiflashRU)) + total, detail := FormatRUV2Summary(bypassed, weights, tikvRU, tiflashRU) + require.Empty(t, total) + require.Empty(t, detail) + }) } func TestRUV2MetricsSnapshotFreezesRUValues(t *testing.T) { diff --git a/pkg/util/execdetails/ruv2_metrics.go b/pkg/util/execdetails/ruv2_metrics.go index 85d52e9ef443b..a7e326cd501b3 100644 --- a/pkg/util/execdetails/ruv2_metrics.go +++ b/pkg/util/execdetails/ruv2_metrics.go @@ -60,6 +60,8 @@ func RUV2MetricsFromContext(ctx context.Context) *RUV2Metrics { // RUV2Metrics stores statement-level RUv2 metrics. type RUV2Metrics struct { + bypass atomic.Bool + resultChunkCells int64 executorL1 sync.Map @@ -89,15 +91,28 @@ func NewRUV2Metrics() *RUV2Metrics { return &RUV2Metrics{} } +// SetBypass marks whether statement-level RU accounting should be skipped. +func (m *RUV2Metrics) SetBypass(enabled bool) { + m.bypass.Store(enabled) +} + +// Bypass returns whether statement-level RU accounting should be skipped. +func (m *RUV2Metrics) Bypass() bool { + return m.bypass.Load() +} + // AddResultChunkCells records result cells written by the current statement. func (m *RUV2Metrics) AddResultChunkCells(delta int64) { + if m.Bypass() { + return + } metrics.RUV2ResultChunkCells.Add(float64(delta)) atomic.AddInt64(&m.resultChunkCells, delta) } // AddExecutorMetric records a statement-level executor metric for the given RUv2 level. func (m *RUV2Metrics) AddExecutorMetric(level int, label string, delta int64) { - if delta == 0 || label == "" { + if m.Bypass() || delta == 0 || label == "" { return } switch level { @@ -120,85 +135,124 @@ func (m *RUV2Metrics) AddExecutorMetric(level int, label string, delta int64) { // AddExecutorL5InsertRows records affected insert rows for RUv2 accounting. func (m *RUV2Metrics) AddExecutorL5InsertRows(delta int64) { + if m.Bypass() { + return + } metrics.RUV2ExecutorL5InsertRows.Add(float64(delta)) atomic.AddInt64(&m.executorL5InsertRows, delta) } // AddPlanCnt records plan builder invocations for the current statement. func (m *RUV2Metrics) AddPlanCnt(delta int64) { + if m.Bypass() { + return + } metrics.RUV2PlanCnt.Add(float64(delta)) atomic.AddInt64(&m.planCnt, delta) } // AddPlanDeriveStatsPaths records derived stats paths for the current statement. func (m *RUV2Metrics) AddPlanDeriveStatsPaths(delta int64) { + if m.Bypass() { + return + } metrics.RUV2PlanDeriveStatsPaths.Add(float64(delta)) atomic.AddInt64(&m.planDeriveStatsPaths, delta) } // AddSessionParserTotal records parser executions for the current statement. func (m *RUV2Metrics) AddSessionParserTotal(delta int64) { + if m.Bypass() { + return + } metrics.RUV2SessionParserTotal.Add(float64(delta)) atomic.AddInt64(&m.sessionParserTotal, delta) } // AddTxnCnt records transaction completions attributed to the current statement. func (m *RUV2Metrics) AddTxnCnt(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TxnCnt.Add(float64(delta)) atomic.AddInt64(&m.txnCnt, delta) } // AddResourceManagerReadCnt records TiKV read RPCs charged to resource management. func (m *RUV2Metrics) AddResourceManagerReadCnt(delta int64) { + if m.Bypass() { + return + } metrics.RUV2ResourceManagerReadCnt.Add(float64(delta)) atomic.AddInt64(&m.resourceManagerReadCnt, delta) } // AddResourceManagerWriteCnt records TiKV write RPCs charged to resource management. func (m *RUV2Metrics) AddResourceManagerWriteCnt(delta int64) { + if m.Bypass() { + return + } metrics.RUV2ResourceManagerWriteCnt.Add(float64(delta)) atomic.AddInt64(&m.resourceManagerWriteCnt, delta) } // AddTiKVKVEngineCacheMiss records TiKV kv_engine_cache_miss counters from ExecDetailsV2. func (m *RUV2Metrics) AddTiKVKVEngineCacheMiss(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TiKVKVEngineCacheMiss.Add(float64(delta)) atomic.AddInt64(&m.tikvKvEngineCacheMiss, delta) } // AddTiKVCoprocessorExecutorIterations records TiKV coprocessor iteration counters. func (m *RUV2Metrics) AddTiKVCoprocessorExecutorIterations(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TiKVCoprocessorExecutorIterations.Add(float64(delta)) atomic.AddInt64(&m.tikvCoprocessorExecutorIterations, delta) } // AddTiKVCoprocessorResponseBytes records TiKV coprocessor response bytes. func (m *RUV2Metrics) AddTiKVCoprocessorResponseBytes(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TiKVCoprocessorResponseBytes.Add(float64(delta)) atomic.AddInt64(&m.tikvCoprocessorResponseBytes, delta) } // AddTiKVRaftstoreStoreWriteTriggerWB records TiKV raftstore write trigger bytes. func (m *RUV2Metrics) AddTiKVRaftstoreStoreWriteTriggerWB(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TiKVRaftstoreStoreWriteTriggerWB.Add(float64(delta)) atomic.AddInt64(&m.tikvRaftstoreStoreWriteTriggerWB, delta) } // AddTiKVStorageProcessedKeysBatchGet records TiKV batch-get processed keys. func (m *RUV2Metrics) AddTiKVStorageProcessedKeysBatchGet(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TiKVStorageProcessedKeysBatchGet.Add(float64(delta)) atomic.AddInt64(&m.tikvStorageProcessedKeysBatchGet, delta) } // AddTiKVStorageProcessedKeysGet records TiKV get processed keys. func (m *RUV2Metrics) AddTiKVStorageProcessedKeysGet(delta int64) { + if m.Bypass() { + return + } metrics.RUV2TiKVStorageProcessedKeysGet.Add(float64(delta)) atomic.AddInt64(&m.tikvStorageProcessedKeysGet, delta) } // AddTiKVCoprocessorWorkTotal records TiKV executor input counters by executor type. func (m *RUV2Metrics) AddTiKVCoprocessorWorkTotal(label string, delta int64) { - if delta == 0 || label == "" { + if m.Bypass() || delta == 0 || label == "" { return } metrics.RUV2TiKVCoprocessorWorkTotal.WithLabelValues(label).Add(float64(delta)) @@ -211,6 +265,7 @@ func (m *RUV2Metrics) Clone() *RUV2Metrics { return nil } cloned := &RUV2Metrics{} + cloned.bypass.Store(m.Bypass()) atomic.StoreInt64(&cloned.resultChunkCells, atomic.LoadInt64(&m.resultChunkCells)) cloneRUV2LabelCounter(&cloned.executorL1, &m.executorL1) cloneRUV2LabelCounter(&cloned.executorL2, &m.executorL2) @@ -295,6 +350,9 @@ func (m *RUV2Metrics) Merge(other *RUV2Metrics) { if m == nil || other == nil { return } + if m.Bypass() || other.Bypass() { + return + } atomic.AddInt64(&m.resultChunkCells, other.ResultChunkCells()) mergeIntoRUV2LabelCounter(&m.executorL1, &other.executorL1) mergeIntoRUV2LabelCounter(&m.executorL2, &other.executorL2) @@ -436,7 +494,7 @@ func (m *RUV2Metrics) TiKVStorageProcessedKeysGet() int64 { // IsZero checks whether all metrics are zero. func (m *RUV2Metrics) IsZero() bool { - if m == nil { + if m == nil || m.Bypass() { return true } return m.ResultChunkCells() == 0 && @@ -463,7 +521,7 @@ func (m *RUV2Metrics) IsZero() bool { // provided weights. The weights specify how each component is weighted in the // RU calculation. Returns the calculated TiDB RU as a float64. func (m *RUV2Metrics) CalculateRUValues(weights RUV2Weights) (tidbRU float64) { - if m == nil { + if m == nil || m.Bypass() { return 0 } return m.calculateRUValuesWithWeights(weights) @@ -471,6 +529,9 @@ func (m *RUV2Metrics) CalculateRUValues(weights RUV2Weights) (tidbRU float64) { // TotalRU returns the statement RU v2 total as TiDB + TiKV + TiFlash. func (m *RUV2Metrics) TotalRU(weights RUV2Weights, tiKVRU, tiFlashRU float64) float64 { + if m.Bypass() { + return 0 + } return m.CalculateRUValues(weights) + tiKVRU + tiFlashRU } @@ -504,6 +565,9 @@ func sumRUV2LabelMap(values map[string]int64) int64 { // FormatRUV2Summary formats the RUv2 total and detailed metrics in one pass. func FormatRUV2Summary(metrics *RUV2Metrics, weights RUV2Weights, tiKVRU, tiFlashRU float64) (total string, detail string) { + if metrics != nil && metrics.Bypass() { + return "", "" + } var ( resultChunkCells int64 executorL1 map[string]int64 From e8cc2aeb7d021f6ae7fbd746656fb0a9c33ce45e Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 30 Mar 2026 14:41:33 +0800 Subject: [PATCH 2/8] *: bump client-go to latest master Signed-off-by: disksing --- DEPS.bzl | 12 ++++++------ go.mod | 2 +- go.sum | 2 ++ 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 91254eb81f34f..c600fa4b560db 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -7766,13 +7766,13 @@ def go_deps(): build_tags = ["nextgen", "intest"], build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sha256 = "daff9dec412c452d98f2adf2e2c4ac5132e93acd0f773612c69681cd6f76b4d4", - strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260326064539-728676760deb", + sha256 = "06b88aebfa4f416d2fd77edb70d51f8c590f61f71be6cbfb328e525e9907d027", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260330060945-282ada62b856", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip", - "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index f95af192719fc..186f2f6fd2d56 100644 --- a/go.mod +++ b/go.mod @@ -124,7 +124,7 @@ require ( github.com/stathat/consistent v1.0.0 github.com/stretchr/testify v1.11.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb + github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856 github.com/tikv/pd/client v0.0.0-20260323032024-d7b638033a14 github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 github.com/twmb/murmur3 v1.1.6 diff --git a/go.sum b/go.sum index 21f1dfbda3d8b..dce96c291a288 100644 --- a/go.sum +++ b/go.sum @@ -897,6 +897,8 @@ github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb h1:ZVxuRRtpKp76G4zNePQ9vLV2UwEf8nJjr6Z2kAHTbmg= github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb/go.mod h1:2itxkVYz1s+dKsfJZqKcFD2MGUoDxvyy/Yqjd7vkdjg= +github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856 h1:CJ2alESXl+dW4hQwO1vY5ingIDr6dGRB+8WodZeh24g= +github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856/go.mod h1:2itxkVYz1s+dKsfJZqKcFD2MGUoDxvyy/Yqjd7vkdjg= github.com/tikv/pd/client v0.0.0-20260323032024-d7b638033a14 h1:TVSx20m6DZMSiI37Dduu9RZb8yUvT1sgW8kCLAe+T5U= github.com/tikv/pd/client v0.0.0-20260323032024-d7b638033a14/go.mod h1:4kxXuAQAREpH+lVbydVwGNNDmcwdj0RG4Ofwky08W/k= github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 h1:9LPGD+jzxMlnk5r6+hJnar67cgpDIz/iyD+rfl5r2Vk= From 0776677e0e3de1c4be089ec36673067f8451cdd4 Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 30 Mar 2026 15:03:30 +0800 Subject: [PATCH 3/8] *: remove unused StmtCtx.BypassRU Signed-off-by: disksing --- pkg/executor/adapter_test.go | 1 - pkg/session/session.go | 1 - pkg/session/tidb_test.go | 1 - pkg/sessionctx/stmtctx/stmtctx.go | 3 --- 4 files changed, 6 deletions(-) diff --git a/pkg/executor/adapter_test.go b/pkg/executor/adapter_test.go index 9a970e6baa9b0..fe233070f312f 100644 --- a/pkg/executor/adapter_test.go +++ b/pkg/executor/adapter_test.go @@ -546,7 +546,6 @@ func TestFinishExecuteStmtReportsTiDBRUV2WithoutSyncingRUDetails(t *testing.T) { sessVars.StmtCtx.OriginalSQL = "select 1" sessVars.StmtCtx.ResetSQLDigest(sessVars.StmtCtx.OriginalSQL) sessVars.StmtCtx.ResourceGroupName = "rg1" - sessVars.StmtCtx.BypassRU = true goCtx := execdetails.ContextWithInitializedExecDetails(context.Background()) sessVars.RUV2Metrics = execdetails.RUV2MetricsFromContext(goCtx) diff --git a/pkg/session/session.go b/pkg/session/session.go index bd467cacecf7d..03ddf1f9665e8 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2442,7 +2442,6 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s } sessVars.RUV2Metrics = ruv2Metrics bypass := shouldBypass(ctx, stmtNode, sessVars) - sessVars.StmtCtx.BypassRU = bypass if ruv2Metrics != nil { ruv2Metrics.SetBypass(bypass) } diff --git a/pkg/session/tidb_test.go b/pkg/session/tidb_test.go index a1080c58ea960..dc39263254185 100644 --- a/pkg/session/tidb_test.go +++ b/pkg/session/tidb_test.go @@ -103,7 +103,6 @@ func TestRUV2SessionParserTotalDoesNotLeakAcrossStandaloneParse(t *testing.T) { _, err = se.ExecuteStmt(ctx, stmt) require.NoError(t, err) require.Zero(t, se.sessionVars.RUV2PendingSessionParserTotal.Load()) - require.True(t, se.sessionVars.StmtCtx.BypassRU) require.NotNil(t, se.sessionVars.RUV2Metrics) require.True(t, se.sessionVars.RUV2Metrics.Bypass()) require.Zero(t, se.sessionVars.RUV2Metrics.SessionParserTotal()) diff --git a/pkg/sessionctx/stmtctx/stmtctx.go b/pkg/sessionctx/stmtctx/stmtctx.go index ac7dfa524da2e..1284eddc81ea0 100644 --- a/pkg/sessionctx/stmtctx/stmtctx.go +++ b/pkg/sessionctx/stmtctx/stmtctx.go @@ -482,9 +482,6 @@ type StatementContext struct { // IsExplainAnalyzeDML is true if the statement is "explain analyze DML executors", before responding the explain // results to the client, the transaction should be committed first. See issue #37373 for more details. IsExplainAnalyzeDML bool - // BypassRU marks statements whose RU accounting should be skipped to stay aligned with - // the existing v1 bypass semantics for internal low-impact and internal analyze requests. - BypassRU bool // InsertRowsAsRUV2Recorded tracks whether the statement-level insert-row RUv2 cost has already been // applied to RUV2Metrics. This must stay idempotent because EXPLAIN ANALYZE INSERT snapshots RU before // FinishExecuteStmt runs, while FinishExecuteStmt still needs to reuse the same accounting path for the From 1e3e6f28090e1422be3c0c83b16121ba0a0ccabd Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 30 Mar 2026 16:03:44 +0800 Subject: [PATCH 4/8] tidy Signed-off-by: disksing --- go.sum | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.sum b/go.sum index dce96c291a288..a015ee353ed31 100644 --- a/go.sum +++ b/go.sum @@ -895,8 +895,6 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= -github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb h1:ZVxuRRtpKp76G4zNePQ9vLV2UwEf8nJjr6Z2kAHTbmg= -github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb/go.mod h1:2itxkVYz1s+dKsfJZqKcFD2MGUoDxvyy/Yqjd7vkdjg= github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856 h1:CJ2alESXl+dW4hQwO1vY5ingIDr6dGRB+8WodZeh24g= github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856/go.mod h1:2itxkVYz1s+dKsfJZqKcFD2MGUoDxvyy/Yqjd7vkdjg= github.com/tikv/pd/client v0.0.0-20260323032024-d7b638033a14 h1:TVSx20m6DZMSiI37Dduu9RZb8yUvT1sgW8kCLAe+T5U= From 2206784eeecbb7c48c8fbf89a0b503ebb2362ba4 Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 30 Mar 2026 16:28:56 +0800 Subject: [PATCH 5/8] *: address follow-up review comments Signed-off-by: disksing --- .../internal/mpp/local_mpp_coordinator.go | 2 +- pkg/session/session.go | 4 ++- pkg/session/tidb_test.go | 26 ++++++++++++++++--- pkg/util/execdetails/execdetails_test.go | 5 ++++ pkg/util/execdetails/ruv2_metrics.go | 3 +++ 5 files changed, 35 insertions(+), 5 deletions(-) diff --git a/pkg/executor/internal/mpp/local_mpp_coordinator.go b/pkg/executor/internal/mpp/local_mpp_coordinator.go index 9bae77f8771d8..df4dbc154c71c 100644 --- a/pkg/executor/internal/mpp/local_mpp_coordinator.go +++ b/pkg/executor/internal/mpp/local_mpp_coordinator.go @@ -784,7 +784,7 @@ func (c *localMppCoordinator) handleAllReports() error { RecordOneCopTask(-1, kv.TiFlash, detail)] = 0 } } - ruv2Metrics := c.sessionCtx.GetSessionVars().RUV2Metrics + ruv2Metrics := execdetails.RUV2MetricsFromContext(c.ctx) if ruv2Metrics == nil || !ruv2Metrics.Bypass() { if ruDetailsRaw := c.ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil { if err := execdetails.MergeTiFlashRUConsumption(report.executionSummaries, ruDetailsRaw.(*clientutil.RUDetails)); err != nil { diff --git a/pkg/session/session.go b/pkg/session/session.go index 03ddf1f9665e8..df9ddeb895434 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2732,12 +2732,14 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s return recordSet, nil } +var isNextGenForRUV2 = kerneltype.IsNextGen + func shouldBypass(ctx context.Context, stmtNode ast.StmtNode, sessVars *variable.SessionVars) bool { switch kv.GetInternalSourceType(ctx) { case kv.InternalTxnOthers: return true case kv.InternalTxnStats: - return kerneltype.IsNextGen() && isAnalyzeStatementForRUV2(stmtNode, sessVars) + return isNextGenForRUV2() && isAnalyzeStatementForRUV2(stmtNode, sessVars) default: return false } diff --git a/pkg/session/tidb_test.go b/pkg/session/tidb_test.go index dc39263254185..660d40be86e4a 100644 --- a/pkg/session/tidb_test.go +++ b/pkg/session/tidb_test.go @@ -19,7 +19,6 @@ import ( "testing" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/pkg/config/kerneltype" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/ast" @@ -90,7 +89,7 @@ func TestRUV2SessionParserTotalDoesNotLeakAcrossStandaloneParse(t *testing.T) { require.Equal(t, int64(1), se.sessionVars.RUV2Metrics.SessionParserTotal()) dctx := se.GetDistSQLCtx() - require.Equal(t, se.sessionVars.RUV2Metrics, dctx.RUV2Metrics) + require.Same(t, se.sessionVars.RUV2Metrics, dctx.RUV2Metrics) require.NotNil(t, dctx.RUV2RPCInterceptor) }) @@ -110,7 +109,28 @@ func TestRUV2SessionParserTotalDoesNotLeakAcrossStandaloneParse(t *testing.T) { t.Run("statement bypass decision follows internal analyze semantics", func(t *testing.T) { statsCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - require.Equal(t, kerneltype.IsNextGen(), shouldBypass(statsCtx, &ast.AnalyzeTableStmt{}, se.sessionVars)) + origIsNextGenForRUV2 := isNextGenForRUV2 + defer func() { + isNextGenForRUV2 = origIsNextGenForRUV2 + }() + + MustExec(t, se, "use test") + MustExec(t, se, "drop table if exists bypass_prepare") + MustExec(t, se, "create table bypass_prepare (a int)") + + stmtID, _, _, err := se.PrepareStmt("analyze table bypass_prepare") + require.NoError(t, err) + prepStmt, err := se.GetSessionVars().GetPreparedStmtByID(stmtID) + require.NoError(t, err) + execAnalyzeStmt := &ast.ExecuteStmt{PrepStmt: prepStmt} + + isNextGenForRUV2 = func() bool { return true } + require.True(t, shouldBypass(statsCtx, &ast.AnalyzeTableStmt{}, se.sessionVars)) + require.True(t, shouldBypass(statsCtx, execAnalyzeStmt, se.sessionVars)) + + isNextGenForRUV2 = func() bool { return false } + require.False(t, shouldBypass(statsCtx, &ast.AnalyzeTableStmt{}, se.sessionVars)) + require.False(t, shouldBypass(statsCtx, execAnalyzeStmt, se.sessionVars)) require.False(t, shouldBypass(statsCtx, &ast.SelectStmt{}, se.sessionVars)) }) } diff --git a/pkg/util/execdetails/execdetails_test.go b/pkg/util/execdetails/execdetails_test.go index 3020bc7608423..cfe92ca597dee 100644 --- a/pkg/util/execdetails/execdetails_test.go +++ b/pkg/util/execdetails/execdetails_test.go @@ -389,6 +389,11 @@ func TestRUV2MetricsSnapshotCalculateRUValues(t *testing.T) { require.Empty(t, total) require.Empty(t, detail) }) + + t.Run("nil metrics keep tikv and tiflash ru", func(t *testing.T) { + var nilMetrics *RUV2Metrics + require.Equal(t, tikvRU+tiflashRU, nilMetrics.TotalRU(weights, tikvRU, tiflashRU)) + }) } func TestRUV2MetricsSnapshotFreezesRUValues(t *testing.T) { diff --git a/pkg/util/execdetails/ruv2_metrics.go b/pkg/util/execdetails/ruv2_metrics.go index a7e326cd501b3..30bb3d057c411 100644 --- a/pkg/util/execdetails/ruv2_metrics.go +++ b/pkg/util/execdetails/ruv2_metrics.go @@ -529,6 +529,9 @@ func (m *RUV2Metrics) CalculateRUValues(weights RUV2Weights) (tidbRU float64) { // TotalRU returns the statement RU v2 total as TiDB + TiKV + TiFlash. func (m *RUV2Metrics) TotalRU(weights RUV2Weights, tiKVRU, tiFlashRU float64) float64 { + if m == nil { + return tiKVRU + tiFlashRU + } if m.Bypass() { return 0 } From 3a418ad09f36d2de7f92b7863a2f3d4130ad13d0 Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 30 Mar 2026 17:01:27 +0800 Subject: [PATCH 6/8] pkg/session: centralize execute stmt unwrapping Signed-off-by: disksing --- pkg/session/session.go | 76 +++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 35 deletions(-) diff --git a/pkg/session/session.go b/pkg/session/session.go index df9ddeb895434..f834f90d24747 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -1127,15 +1127,19 @@ func (*session) isTxnRetryableError(err error) bool { } func isEndTxnStmt(stmt ast.StmtNode, vars *variable.SessionVars) (bool, error) { - switch n := stmt.(type) { + resolvedStmt, err := resolvePreparedStmt(stmt, vars) + if err != nil { + return false, err + } + if resolvedStmt == nil { + return false, nil + } + if resolvedStmt != stmt { + return isEndTxnStmt(resolvedStmt, vars) + } + switch resolvedStmt.(type) { case *ast.RollbackStmt, *ast.CommitStmt: return true, nil - case *ast.ExecuteStmt: - ps, err := plannercore.GetPreparedStmt(n, vars) - if err != nil { - return false, err - } - return isEndTxnStmt(ps.PreparedAst.Stmt, vars) } return false, nil } @@ -2493,11 +2497,8 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s }) var stmtLabel string - if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok { - prepareStmt, err := plannercore.GetPreparedStmt(execStmt, s.sessionVars) - if err == nil && prepareStmt.PreparedAst != nil { - stmtLabel = stmtctx.GetStmtLabel(ctx, prepareStmt.PreparedAst.Stmt) - } + if resolvedStmt, err := resolvePreparedStmt(stmtNode, s.sessionVars); err == nil && resolvedStmt != nil { + stmtLabel = stmtctx.GetStmtLabel(ctx, resolvedStmt) } if stmtLabel == "" { stmtLabel = stmtctx.GetStmtLabel(ctx, stmtNode) @@ -2734,6 +2735,27 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s var isNextGenForRUV2 = kerneltype.IsNextGen +func resolvePreparedStmt(stmt ast.StmtNode, vars *variable.SessionVars) (ast.StmtNode, error) { + if stmt == nil { + return nil, nil + } + execStmt, ok := stmt.(*ast.ExecuteStmt) + if !ok { + return stmt, nil + } + if vars == nil { + return nil, nil + } + prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars) + if err != nil { + return nil, err + } + if prepareStmt == nil || prepareStmt.PreparedAst == nil { + return nil, nil + } + return prepareStmt.PreparedAst.Stmt, nil +} + func shouldBypass(ctx context.Context, stmtNode ast.StmtNode, sessVars *variable.SessionVars) bool { switch kv.GetInternalSourceType(ctx) { case kv.InternalTxnOthers: @@ -2749,19 +2771,12 @@ func isAnalyzeStatementForRUV2(stmtNode ast.StmtNode, sessVars *variable.Session if stmtNode == nil || sessVars == nil { return false } - switch stmt := stmtNode.(type) { - case *ast.AnalyzeTableStmt: - return true - case *ast.ExecuteStmt: - prepareStmt, err := plannercore.GetPreparedStmt(stmt, sessVars) - if err != nil { - return false - } - _, ok := prepareStmt.PreparedAst.Stmt.(*ast.AnalyzeTableStmt) - return ok - default: + resolvedStmt, err := resolvePreparedStmt(stmtNode, sessVars) + if err != nil || resolvedStmt == nil { return false } + _, ok := resolvedStmt.(*ast.AnalyzeTableStmt) + return ok } func (s *session) GetSQLExecutor() sqlexec.SQLExecutor { @@ -4801,22 +4816,13 @@ func (s *session) shouldUsePessimisticAutoCommit(stmtNode ast.StmtNode) bool { } // isDMLStatement checks if the given statement should use pessimistic-auto-commit. -// It handles EXECUTE unwrapping and properly handles EXPLAIN statements by checking their inner statement. +// It unwraps EXECUTE statements and properly handles EXPLAIN statements by checking their inner statement. func (s *session) isDMLStatement(stmtNode ast.StmtNode) bool { - if stmtNode == nil { + actualStmt, err := resolvePreparedStmt(stmtNode, s.GetSessionVars()) + if err != nil || actualStmt == nil { return false } - // Handle EXECUTE statements - unwrap to get the actual prepared statement - actualStmt := stmtNode - if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok { - prepareStmt, err := plannercore.GetPreparedStmt(execStmt, s.GetSessionVars()) - if err != nil || prepareStmt == nil { - return false - } - actualStmt = prepareStmt.PreparedAst.Stmt - } - // For EXPLAIN statements, check the underlying statement // This ensures EXPLAIN shows the correct plan that would be used if the statement were executed if explainStmt, ok := actualStmt.(*ast.ExplainStmt); ok { From 70e041aa349cefbcb8f7c3f793b48bde50dc8774 Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 30 Mar 2026 18:02:34 +0800 Subject: [PATCH 7/8] pkg/session: restore outer ruv2 metrics after internal sql Signed-off-by: disksing --- pkg/session/session.go | 2 ++ pkg/session/tidb_test.go | 16 ++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/pkg/session/session.go b/pkg/session/session.go index f834f90d24747..f582881dc7236 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2232,6 +2232,7 @@ func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, fu prevSQL := s.sessionVars.StmtCtx.OriginalSQL prevStmtType := s.sessionVars.StmtCtx.StmtType prevTables := s.sessionVars.StmtCtx.Tables + prevRUV2Metrics := s.sessionVars.RUV2Metrics return s, func() { s.sessionVars.AnalyzeVersion = prevStatsVer s.sessionVars.EnableAnalyzeSnapshot = prevAnalyzeSnapshot @@ -2244,6 +2245,7 @@ func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, fu s.sessionVars.StmtCtx.OriginalSQL = prevSQL s.sessionVars.StmtCtx.StmtType = prevStmtType s.sessionVars.StmtCtx.Tables = prevTables + s.sessionVars.RUV2Metrics = prevRUV2Metrics s.sessionVars.MemTracker.Detach() }, nil } diff --git a/pkg/session/tidb_test.go b/pkg/session/tidb_test.go index 660d40be86e4a..413c99107dd14 100644 --- a/pkg/session/tidb_test.go +++ b/pkg/session/tidb_test.go @@ -25,6 +25,8 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/execdetails" + "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/stretchr/testify/require" ) @@ -133,6 +135,20 @@ func TestRUV2SessionParserTotalDoesNotLeakAcrossStandaloneParse(t *testing.T) { require.False(t, shouldBypass(statsCtx, execAnalyzeStmt, se.sessionVars)) require.False(t, shouldBypass(statsCtx, &ast.SelectStmt{}, se.sessionVars)) }) + + t.Run("current-session restricted sql restores outer ruv2 metrics", func(t *testing.T) { + outerCtx := execdetails.ContextWithInitializedExecDetails(context.Background()) + outerMetrics := execdetails.RUV2MetricsFromContext(outerCtx) + require.NotNil(t, outerMetrics) + se.sessionVars.RUV2Metrics = outerMetrics + + internalCtx := kv.WithInternalSourceType(outerCtx, kv.InternalTxnOthers) + _, _, err := se.ExecRestrictedSQL(internalCtx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "select 1") + require.NoError(t, err) + + require.Same(t, outerMetrics, se.sessionVars.RUV2Metrics) + require.False(t, outerMetrics.Bypass()) + }) } func TestCrossKSSessionDistSQLCtxDoesNotExposeTypedNilRUReporter(t *testing.T) { From 607dd3f666c157645429fad53b3575842c09056f Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 30 Mar 2026 19:50:32 +0800 Subject: [PATCH 8/8] fix bazel Signed-off-by: disksing --- pkg/session/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/session/BUILD.bazel b/pkg/session/BUILD.bazel index 49070d0b8a94f..8ad1c907d87cb 100644 --- a/pkg/session/BUILD.bazel +++ b/pkg/session/BUILD.bazel @@ -201,6 +201,7 @@ go_test( "//pkg/util", "//pkg/util/benchdaily", "//pkg/util/chunk", + "//pkg/util/execdetails", "//pkg/util/logutil", "//pkg/util/sqlexec", "@com_github_pingcap_errors//:errors",