Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5971,13 +5971,13 @@ def go_deps():
name = "com_github_pingcap_tipb",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/tipb",
sha256 = "f0ef12d299ff4b0afca6f29723fa0f50fd6228fd38e1f99d925430f2d2dc7ea9",
strip_prefix = "github.com/pingcap/tipb@v0.0.0-20260202031324-4ce7b6c65c98",
sha256 = "af0d5b3d3d28d8c7e43b446873541c0f53a1c552e78fd2eda33931302c9f62e3",
strip_prefix = "github.com/pingcap/tipb@v0.0.0-20260507102040-d3d6e146648f",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20260202031324-4ce7b6c65c98.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20260202031324-4ce7b6c65c98.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20260202031324-4ce7b6c65c98.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20260202031324-4ce7b6c65c98.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20260507102040-d3d6e146648f.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20260507102040-d3d6e146648f.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20260507102040-d3d6e146648f.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20260507102040-d3d6e146648f.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ require (
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e
github.com/pingcap/tipb v0.0.0-20260202031324-4ce7b6c65c98
github.com/pingcap/tipb v0.0.0-20260507102040-d3d6e146648f
github.com/prometheus/client_golang v1.23.0
github.com/prometheus/client_model v0.6.2
github.com/prometheus/common v0.65.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -663,8 +663,8 @@ github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGa
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530=
github.com/pingcap/tipb v0.0.0-20260202031324-4ce7b6c65c98 h1:rCUl6auTYEiq+k5hGVDTjInH++wY/XgZ8V5Lp24OaEo=
github.com/pingcap/tipb v0.0.0-20260202031324-4ce7b6c65c98/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pingcap/tipb v0.0.0-20260507102040-d3d6e146648f h1:ld8bQ5d0zh1B0HRbJHiaf2seZvcVV5Ug2rih70uNJMM=
github.com/pingcap/tipb v0.0.0-20260507102040-d3d6e146648f/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
22 changes: 11 additions & 11 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
}

if executor == nil {
b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti)
b := newExecutorBuilder(ctx, a.Ctx, a.InfoSchema, a.Ti)
executor = b.build(a.Plan)
if b.err != nil {
return nil, b.err
Expand Down Expand Up @@ -608,7 +608,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}
ctx = a.observeStmtBeginForTopSQL(ctx)

e, err := a.buildExecutor()
e, err := a.buildExecutor(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1295,7 +1295,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error
a.resetPhaseDurations()

a.inheritContextFromExecuteStmt()
e, err := a.buildExecutor()
e, err := a.buildExecutor(ctx)
if err != nil {
return nil, err
}
Expand All @@ -1321,28 +1321,28 @@ type pessimisticTxn interface {
}

// buildExecutor build an executor from plan, prepared statement may need additional procedure.
func (a *ExecStmt) buildExecutor() (exec.Executor, error) {
func (a *ExecStmt) buildExecutor(ctx context.Context) (exec.Executor, error) {
defer func(start time.Time) { a.phaseBuildDurations[0] += time.Since(start) }(time.Now())
ctx := a.Ctx
stmtCtx := ctx.GetSessionVars().StmtCtx
sctx := a.Ctx
stmtCtx := sctx.GetSessionVars().StmtCtx
if _, ok := a.Plan.(*plannercore.Execute); !ok {
if stmtCtx.Priority == mysql.NoPriority && a.LowerPriority {
stmtCtx.Priority = kv.PriorityLow
}
}
if _, ok := a.Plan.(*plannercore.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
if _, ok := a.Plan.(*plannercore.Analyze); ok && sctx.GetSessionVars().InRestrictedSQL {
sctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}

b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti)
b := newExecutorBuilder(ctx, sctx, a.InfoSchema, a.Ti)
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
}

failpoint.Inject("assertTxnManagerAfterBuildExecutor", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterBuildExecutor", true)
sessiontxn.AssertTxnManagerInfoSchema(b.ctx, b.is)
sessiontxn.AssertTxnManagerInfoSchema(b.sctx, b.is)
})

// ExecuteExec is not a real Executor, we only use it to build another Executor from a prepared statement.
Expand All @@ -1352,7 +1352,7 @@ func (a *ExecStmt) buildExecutor() (exec.Executor, error) {
return nil, err
}
if executorExec.lowerPriority {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
sctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}
e = executorExec.stmtExec
}
Expand Down
172 changes: 169 additions & 3 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"slices"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
Expand Down Expand Up @@ -84,6 +86,170 @@ const (
idxTask
)

// flushStatsDeltaForAnalyze flushes pending stats deltas for the tables whose column-analyze
// tasks will capture base count / modify_count from mysql.stats_meta. Without this, a stale
// pre-analyze delta can be applied later and double count rows or modifications.
func flushStatsDeltaForAnalyze(ctx context.Context, sctx sessionctx.Context, plan *core.Analyze) error {
flushObjects := collectStatsDeltaFlushObjectsForAnalyze(plan)
if len(flushObjects) == 0 {
return nil
}
if err := ctx.Err(); err != nil {
return err
}

// HACK: Some tests register in-process TiDB domains but do not start TiDB RPC
// endpoints. Broadcasting FLUSH STATS_DELTA CLUSTER to those mock endpoints can
// spend the TiKV RPC backoff budget before analyze really starts. When the test
// topology cannot receive TiDB broadcast requests, dumping local deltas is the
// only viable approximation.
if intest.InTest {
flushedLocally, err := flushAnalyzeStatsDeltaForTest(ctx, sctx, plan)
if err != nil {
return err
}
if flushedLocally {
return nil
}
}

stmt := &ast.FlushStmt{
Tp: ast.FlushStatsDelta,
IsCluster: true,
FlushObjects: flushObjects,
}
sql, err := restoreFlushStatsDeltaSQL(stmt)
if err != nil {
return err
}
return broadcast(ctx, sctx, sql)
}

// collectStatsDeltaFlushObjectsForAnalyze returns the database-qualified table
// objects whose stats deltas must be flushed before building column analyze
// tasks. Column analyze captures base count / modify_count from mysql.stats_meta,
// so each target table is included once even if it has multiple column tasks.
func collectStatsDeltaFlushObjectsForAnalyze(plan *core.Analyze) []*ast.StatsObject {
flushObjects := make([]*ast.StatsObject, 0, len(plan.ColTasks))
type statsObjectKey struct {
dbName string
tableName string
}
seenObjects := make(map[statsObjectKey]struct{}, len(plan.ColTasks))
appendFlushObject := func(task core.AnalyzeColumnsTask) {
dbName, tableName := task.DBName, task.TableName
if dbName == "" || tableName == "" {
intest.Assert(false, "analyze column task must have database-qualified table name")
return
}
key := statsObjectKey{dbName: dbName, tableName: tableName}
if _, ok := seenObjects[key]; ok {
return
}
seenObjects[key] = struct{}{}
flushObjects = append(flushObjects, &ast.StatsObject{
StatsObjectScope: ast.StatsObjectScopeTable,
DBName: pmodel.NewCIStr(dbName),
TableName: pmodel.NewCIStr(tableName),
})
}
for _, task := range plan.ColTasks {
appendFlushObject(task)
}
return flushObjects
}

func flushAnalyzeStatsDeltaForTest(ctx context.Context, sctx sessionctx.Context, plan *core.Analyze) (bool, error) {
canBroadcast, err := canBroadcastAnalyzeStatsDeltaForTest(ctx)
if err != nil {
return false, err
}
// If every registered TiDB server has a reachable RPC endpoint, use the normal
// broadcast path so RPC-backed tests still exercise the production behavior.
if canBroadcast {
return false, nil
}
targetIDs := collectAnalyzeStatsDeltaTargetIDsForTest(plan)
if len(targetIDs) == 0 {
return false, nil
}
return true, domain.GetDomain(sctx).StatsHandle().DumpStatsDeltaToKV(true, targetIDs...)
}

func canBroadcastAnalyzeStatsDeltaForTest(ctx context.Context) (bool, error) {
servers, err := infosync.GetAllServerInfo(ctx)
if err != nil {
return false, err
}
rpcAddrs := make([]string, 0, len(servers))
for _, server := range servers {
// Keep the same skip behavior as buildTiDBMemCopTasks for placeholder
// nodes that should not receive TiDB-type coprocessor requests.
if server.IP == "<nil>" {
continue
}
// In-process test domains can register server info without starting a
// TiDB RPC listener. In that case AdvertiseAddress stays empty, so a
// normal broadcast would target ":10080" and wait for RPC backoff.
if server.IP == "" {
rpcAddrs = append(rpcAddrs, "")
continue
}
rpcAddrs = append(rpcAddrs, net.JoinHostPort(server.IP, strconv.Itoa(int(server.StatusPort))))
}
return canBroadcastToTiDBRPCForTest(ctx, rpcAddrs), nil
}

func canBroadcastToTiDBRPCForTest(ctx context.Context, rpcAddrs []string) bool {
if len(rpcAddrs) == 0 {
return false
}
for _, addr := range rpcAddrs {
if !isTiDBRPCReachableForTest(ctx, addr) {
return false
}
}
return true
}

func isTiDBRPCReachableForTest(ctx context.Context, addr string) bool {
if addr == "" {
return false
}
dialer := net.Dialer{Timeout: 50 * time.Millisecond}
conn, err := dialer.DialContext(ctx, "tcp", addr)
if err != nil {
return false
}
_ = conn.Close()
return true
}

func collectAnalyzeStatsDeltaTargetIDsForTest(plan *core.Analyze) []int64 {
targetIDs := make([]int64, 0, len(plan.ColTasks))
seenTargetIDs := make(map[int64]struct{}, len(plan.ColTasks))
appendTargetID := func(id int64) {
if _, ok := seenTargetIDs[id]; ok {
return
}
seenTargetIDs[id] = struct{}{}
targetIDs = append(targetIDs, id)
}
for _, task := range plan.ColTasks {
if task.TblInfo == nil {
intest.Assert(false, "analyze column task must have table info")
continue
}
appendTargetID(task.TblInfo.ID)
if partitionInfo := task.TblInfo.GetPartitionInfo(); partitionInfo != nil {
for _, def := range partitionInfo.Definitions {
appendTargetID(def.ID)
}
}
}
return targetIDs
}

// Next implements the Executor Next interface.
// It will collect all the sample task and run them concurrently.
func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
Expand Down Expand Up @@ -121,7 +287,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
taskCh := make(chan *analyzeTask, buildStatsConcurrency)
resultsCh := make(chan *statistics.AnalyzeResults, 1)
for i := 0; i < buildStatsConcurrency; i++ {
e.wg.Run(func() { e.analyzeWorker(taskCh, resultsCh) })
e.wg.Run(func() { e.analyzeWorker(ctx, taskCh, resultsCh) })
}
pruneMode := variable.PartitionPruneMode(sessionVars.PartitionPruneMode.Load())
// needGlobalStats used to indicate whether we should merge the partition-level stats to global-level stats.
Expand Down Expand Up @@ -492,7 +658,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(
return err
}

func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) {
func (e *AnalyzeExec) analyzeWorker(ctx context.Context, taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) {
var task *analyzeTask
statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
defer func() {
Expand Down Expand Up @@ -524,7 +690,7 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
select {
case <-e.errExitCh:
return
case resultsCh <- analyzeColumnsPushDownEntry(e.gp, task.colExec):
case resultsCh <- analyzeColumnsPushDownEntry(ctx, e.gp, task.colExec):
}
case idxTask:
select {
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ type AnalyzeColumnsExec struct {
memTracker *memory.Tracker
}

func analyzeColumnsPushDownEntry(gp *gp.Pool, e *AnalyzeColumnsExec) *statistics.AnalyzeResults {
func analyzeColumnsPushDownEntry(ctx context.Context, gp *gp.Pool, e *AnalyzeColumnsExec) *statistics.AnalyzeResults {
if e.AnalyzeInfo.StatsVersion >= statistics.Version2 {
return e.toV2().analyzeColumnsPushDownV2(gp)
return e.toV2().analyzeColumnsPushDownV2(ctx, gp)
}
return e.toV1().analyzeColumnsPushDownV1()
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type AnalyzeColumnsExecV2 struct {
*AnalyzeColumnsExec
}

func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2(gp *gp.Pool) *statistics.AnalyzeResults {
func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2(ctx context.Context, gp *gp.Pool) *statistics.AnalyzeResults {
var ranges []*ranger.Range
if hc := e.handleCols; hc != nil {
if hc.IsInt() {
Expand Down Expand Up @@ -94,7 +94,7 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2(gp *gp.Pool) *statistics
return &statistics.AnalyzeResults{Err: err, Job: e.job}
}
idxNDVPushDownCh := make(chan analyzeIndexNDVTotalResult, 1)
e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh, samplingStatsConcurrency)
e.handleNDVForSpecialIndexes(ctx, specialIndexes, idxNDVPushDownCh, samplingStatsConcurrency)
count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(gp, ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency)
if err != nil {
e.memTracker.Release(e.memTracker.BytesConsumed())
Expand Down Expand Up @@ -435,7 +435,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
}

// handleNDVForSpecialIndexes deals with the logic to analyze the index containing the virtual column when the mode is full sampling.
func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.IndexInfo, totalResultCh chan analyzeIndexNDVTotalResult, samplingStatsConcurrency int) {
func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(ctx context.Context, indexInfos []*model.IndexInfo, totalResultCh chan analyzeIndexNDVTotalResult, samplingStatsConcurrency int) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze ndv for special index panicked", zap.Any("recover", r), zap.Stack("stack"))
Expand All @@ -445,7 +445,7 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In
}
}
}()
tasks := e.buildSubIndexJobForSpecialIndex(indexInfos)
tasks := e.buildSubIndexJobForSpecialIndex(ctx, indexInfos)
taskCh := make(chan *analyzeTask, len(tasks))
for _, task := range tasks {
AddNewAnalyzeJob(e.ctx, task.job)
Expand Down Expand Up @@ -526,11 +526,11 @@ func (e *AnalyzeColumnsExecV2) subIndexWorkerForNDV(taskCh chan *analyzeTask, re

// buildSubIndexJobForSpecialIndex builds sub index pushed down task to calculate the NDV information for indexes containing virtual column.
// This is because we cannot push the calculation of the virtual column down to the tikv side.
func (e *AnalyzeColumnsExecV2) buildSubIndexJobForSpecialIndex(indexInfos []*model.IndexInfo) []*analyzeTask {
func (e *AnalyzeColumnsExecV2) buildSubIndexJobForSpecialIndex(ctx context.Context, indexInfos []*model.IndexInfo) []*analyzeTask {
_, offset := timeutil.Zone(e.ctx.GetSessionVars().Location())
tasks := make([]*analyzeTask, 0, len(indexInfos))
sc := e.ctx.GetSessionVars().StmtCtx
concurrency := adaptiveAnlayzeDistSQLConcurrency(context.Background(), e.ctx)
concurrency := adaptiveAnlayzeDistSQLConcurrency(ctx, e.ctx)
for _, indexInfo := range indexInfos {
base := baseAnalyzeExec{
ctx: e.ctx,
Expand Down
Loading