-
Notifications
You must be signed in to change notification settings - Fork 6.2k
ttl: honor scan task cancellation across statement boundaries #67285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f2fc6c3
dfb47df
d9cb0d0
92aba7f
c04a5a1
7659f52
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -213,21 +213,25 @@ func (mgr *TaskManager) WithNewSession(fn func(se sessionctx.Context) error) err | |
| func (mgr *TaskManager) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error { | ||
| ctx = clitutil.WithInternalSourceType(ctx, kv.InternalDistTask) | ||
| return mgr.WithNewSession(func(se sessionctx.Context) (err error) { | ||
| // Keep BEGIN on the SQL path so the session enters transaction mode with the usual statement semantics. | ||
| // Commit / rollback use session methods instead, because cleanup still has to finish after caller | ||
| // cancellation and issuing SQL text there can leave the pooled internal session with a live txn. | ||
| _, err = sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), "begin") | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| success := false | ||
| defer func() { | ||
| sql := "rollback" | ||
| if success { | ||
| sql = "commit" | ||
| } | ||
| _, commitErr := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), sql) | ||
| if err == nil && commitErr != nil { | ||
| err = commitErr | ||
| commitErr := se.CommitTxn(ctx) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Follow-up note from the repeated CI failures: the stronger boundary-cancel semantics surfaced a latent cleanup bug here.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you put this inside the comment on why we use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in |
||
| if err == nil && commitErr != nil { | ||
| err = commitErr | ||
| } | ||
| return | ||
| } | ||
|
|
||
| se.RollbackTxn(clitutil.WithInternalSourceType(context.Background(), kv.InternalDistTask)) | ||
| }() | ||
|
|
||
| if err = fn(se); err != nil { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2441,6 +2441,10 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s | |
| if err := executor.ResetContextOfStmt(s, stmtNode); err != nil { | ||
| return nil, err | ||
| } | ||
| // ResetContextOfStmt clears SQLKiller, so honor a canceled caller before executing the next statement. | ||
| if err := ctx.Err(); err != nil { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Root-cause note: this re-check is the semantic center of the fix. |
||
| return nil, err | ||
| } | ||
| ruv2Metrics := execdetails.RUV2MetricsFromContext(ctx) | ||
| if ruv2Metrics == nil { | ||
| ruv2Metrics = execdetails.NewRUV2Metrics() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -137,13 +137,14 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s | |
| } | ||
|
|
||
| func (t *ttlScanTask) doScanWithSession(ctx context.Context, delCh chan<- *ttlDeleteTask, rawSess session.Session) error { | ||
| // TODO: merge the ctx and the taskCtx in ttl scan task, to allow both "cancel" and gracefully stop workers | ||
| // now, the taskCtx is only check at the beginning of every loop | ||
| taskCtx := t.ctx | ||
| tracer := metrics.PhaseTracerFromCtx(ctx) | ||
| defer tracer.EnterPhase(tracer.Phase()) | ||
|
|
||
| tracer.EnterPhase(metrics.PhaseOther) | ||
| // Keep the SQL execution context canceled when either the worker or the TTL task stops. | ||
| scanCtx, cancelScanCtx := context.WithCancel(ctx) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| defer cancelScanCtx() | ||
| doScanFinished, setDoScanFinished := context.WithCancel(context.Background()) | ||
| wg := util.WaitGroupWrapper{} | ||
| wg.Run(func() { | ||
|
|
@@ -153,6 +154,7 @@ func (t *ttlScanTask) doScanWithSession(ctx context.Context, delCh chan<- *ttlDe | |
| case <-doScanFinished.Done(): | ||
| return | ||
| } | ||
| cancelScanCtx() | ||
| logger := t.taskLogger(logutil.BgLogger()) | ||
| logger.Info("kill the running statement in scan task because the task or worker cancelled") | ||
| rawSess.KillStmt() | ||
|
|
@@ -201,7 +203,7 @@ func (t *ttlScanTask) doScanWithSession(ctx context.Context, delCh chan<- *ttlDe | |
| ) | ||
| } | ||
|
|
||
| sess, restoreSession, err := NewScanSession(rawSess, t.tbl, t.ExpireTime) | ||
| sess, restoreSession, err := NewScanSession(scanCtx, rawSess, t.tbl, t.ExpireTime) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
@@ -242,11 +244,11 @@ func (t *ttlScanTask) doScanWithSession(ctx context.Context, delCh chan<- *ttlDe | |
| } | ||
|
|
||
| sqlStart := time.Now() | ||
| rows, retryable, sqlErr := sess.ExecuteSQLWithCheck(ctx, sql) | ||
| rows, retryable, sqlErr := sess.ExecuteSQLWithCheck(scanCtx, sql) | ||
| selectInterval := time.Since(sqlStart) | ||
| if sqlErr != nil { | ||
| metrics.SelectErrorDuration.Observe(selectInterval.Seconds()) | ||
| needRetry := retryable && retryTimes < scanTaskExecuteSQLMaxRetry && ctx.Err() == nil && t.ctx.Err() == nil | ||
| needRetry := retryable && retryTimes < scanTaskExecuteSQLMaxRetry && scanCtx.Err() == nil | ||
| logutil.BgLogger().Warn("execute query for ttl scan task failed", | ||
| zap.String("SQL", sql), | ||
| zap.Int("retryTimes", retryTimes), | ||
|
|
@@ -262,8 +264,8 @@ func (t *ttlScanTask) doScanWithSession(ctx context.Context, delCh chan<- *ttlDe | |
|
|
||
| tracer.EnterPhase(metrics.PhaseWaitRetry) | ||
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case <-scanCtx.Done(): | ||
| return scanCtx.Err() | ||
| case <-time.After(scanTaskExecuteSQLRetryInterval): | ||
| } | ||
| tracer.EnterPhase(metrics.PhaseOther) | ||
|
|
@@ -289,8 +291,8 @@ func (t *ttlScanTask) doScanWithSession(ctx context.Context, delCh chan<- *ttlDe | |
|
|
||
| tracer.EnterPhase(metrics.PhaseDispatch) | ||
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case <-scanCtx.Done(): | ||
| return scanCtx.Err() | ||
| case delCh <- delTask: | ||
| t.statistics.IncTotalRows(len(lastResult)) | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.