Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ type MigrationContext struct {
AbortError error
abortMutex *sync.Mutex

Metrics metrics.MemStatsGaugeEmitter
Metrics metrics.Emitter

OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
Expand Down
58 changes: 54 additions & 4 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func (mgtr *Migrator) Migrate() (err error) {
} else {
retrier = mgtr.retryOperation
}
if err := retrier(mgtr.cutOver); err != nil {
if err := mgtr.cutOverWithMetrics(retrier); err != nil {
return err
}
atomic.StoreInt64(&mgtr.migrationContext.CutOverCompleteFlag, 1)
Expand Down Expand Up @@ -777,7 +777,7 @@ func (mgtr *Migrator) Revert() error {
if err := mgtr.hooksExecutor.OnBeforeCutOver(); err != nil {
return err
}
if err := retrier(mgtr.cutOver); err != nil {
if err := mgtr.cutOverWithMetrics(retrier); err != nil {
return err
}
atomic.StoreInt64(&mgtr.migrationContext.CutOverCompleteFlag, 1)
Expand All @@ -797,6 +797,28 @@ func (mgtr *Migrator) ExecOnFailureHook() (err error) {
return mgtr.hooksExecutor.OnFailure()
}

func (mgtr *Migrator) cutOverWithMetrics(retrier func(func() error, ...bool) error) error {
return mgtr.cutOverOperationWithMetrics(retrier, mgtr.cutOver)
}

func (mgtr *Migrator) cutOverOperationWithMetrics(retrier func(func() error, ...bool) error, operation func() error) error {
cutOverStartTime := time.Now()
err := retrier(func() error {
err := operation()
if err != nil {
metrics.RecordCutOverAttempt(mgtr.migrationContext.Metrics, metrics.CutOverOutcomeRetry)
return err
}
metrics.RecordCutOverAttempt(mgtr.migrationContext.Metrics, metrics.CutOverOutcomeSuccess)
return nil
})
if err != nil {
metrics.RecordCutOverAttempt(mgtr.migrationContext.Metrics, metrics.CutOverOutcomeAbort)
}
metrics.RecordCutOverTotal(mgtr.migrationContext.Metrics, time.Since(cutOverStartTime), err)
return err
}

func (mgtr *Migrator) handleCutOverResult(cutOverError error) (err error) {
if mgtr.migrationContext.TestOnReplica {
// We're merely testing, we don't want to keep this state. Rollback the renames as possible
Expand Down Expand Up @@ -959,9 +981,12 @@ func (mgtr *Migrator) cutOverTwoStep() (err error) {
defer atomic.StoreInt64(&mgtr.migrationContext.InCutOverCriticalSectionFlag, 0)
atomic.StoreInt64(&mgtr.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)

phaseStartTime := time.Now()
if err := mgtr.retryOperation(mgtr.applier.LockOriginalTable); err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseOriginalTableLock, time.Since(phaseStartTime), err)
return err
}
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseOriginalTableLock, time.Since(phaseStartTime), nil)

if err := mgtr.retryOperation(mgtr.waitForEventsUpToLock); err != nil {
return err
Expand All @@ -972,12 +997,19 @@ func (mgtr *Migrator) cutOverTwoStep() (err error) {
return err
}
}
phaseStartTime = time.Now()
if err := mgtr.retryOperation(mgtr.applier.SwapTablesQuickAndBumpy); err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err)
return err
}
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), nil)

phaseStartTime = time.Now()
if err := mgtr.retryOperation(mgtr.applier.UnlockTables); err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(phaseStartTime), err)
return err
}
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(phaseStartTime), nil)

lockAndRenameDuration := mgtr.migrationContext.RenameTablesEndTime.Sub(mgtr.migrationContext.LockTablesStartTime)
renameDuration := mgtr.migrationContext.RenameTablesEndTime.Sub(mgtr.migrationContext.RenameTablesStartTime)
Expand All @@ -1001,14 +1033,17 @@ func (mgtr *Migrator) atomicCutOver() (err error) {
tableLocked := make(chan error, 2)
tableUnlocked := make(chan error, 2)
var renameLockSessionId int64
phaseStartTime := time.Now()
go func() {
if err := mgtr.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &renameLockSessionId); err != nil {
mgtr.migrationContext.Log.Errore(err)
}
}()
if err := <-tableLocked; err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicLock, time.Since(phaseStartTime), err)
return mgtr.migrationContext.Log.Errore(err)
}
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicLock, time.Since(phaseStartTime), nil)
lockOriginalSessionId := <-lockOriginalSessionIdChan
mgtr.migrationContext.Log.Infof("Session locking original & magic tables is %+v", lockOriginalSessionId)
// At this point we know the original table is locked.
Expand All @@ -1026,7 +1061,8 @@ func (mgtr *Migrator) atomicCutOver() (err error) {

// Step 2
// We now attempt an atomic RENAME on original & ghost tables, and expect it to block.
mgtr.migrationContext.RenameTablesStartTime = time.Now()
phaseStartTime = time.Now()
mgtr.migrationContext.RenameTablesStartTime = phaseStartTime

var tableRenameKnownToHaveFailed int64
renameSessionIdChan := make(chan int64, 2)
Expand All @@ -1051,6 +1087,7 @@ func (mgtr *Migrator) atomicCutOver() (err error) {
}
// Wait for the RENAME to appear in PROCESSLIST
if err := mgtr.retryOperation(waitForRename, true); err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err)
// Abort! Release the lock
okToUnlockTable <- true
return err
Expand All @@ -1059,21 +1096,27 @@ func (mgtr *Migrator) atomicCutOver() (err error) {
mgtr.migrationContext.Log.Infof("Found atomic RENAME to be blocking, as expected. Double checking the lock is still in place (though I don't strictly have to)")
}
if err := mgtr.applier.ExpectUsedLock(lockOriginalSessionId); err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), err)
// Abort operation. Just make sure to drop the magic table.
return mgtr.migrationContext.Log.Errore(err)
}
mgtr.migrationContext.Log.Infof("Connection holding lock on original table still exists")

metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseMagicRename, time.Since(phaseStartTime), nil)

// Now that we've found the RENAME blocking, AND the locking connection still alive,
// we know it is safe to proceed to release the lock

renameLockSessionId = renameSessionId
unlockStartTime := time.Now()
okToUnlockTable <- true
// BAM! magic table dropped, original table lock is released
// -> RENAME released -> queries on original are unblocked.
if err := <-tableUnlocked; err != nil {
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(unlockStartTime), err)
return mgtr.migrationContext.Log.Errore(err)
}
metrics.RecordCutOverPhase(mgtr.migrationContext.Metrics, metrics.CutOverPhaseUnlock, time.Since(unlockStartTime), nil)
if err := <-tablesRenamed; err != nil {
return mgtr.migrationContext.Log.Errore(err)
}
Expand Down Expand Up @@ -1168,7 +1211,7 @@ func (mgtr *Migrator) initiateInspector() (err error) {
return nil
}

// emitProgressMetrics emits StatsD gauges from a progress snapshot.
// emitProgressMetrics emits StatsD gauges
func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) {
metrics.EmitProgressGauges(
mgtr.migrationContext.Metrics,
Expand All @@ -1181,6 +1224,13 @@ func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) {
snap.applyEventsBacklog,
snap.applyEventsCapacity,
)
isThrottled, _, _ := mgtr.migrationContext.IsThrottled()
metrics.EmitLagGauges(
mgtr.migrationContext.Metrics,
snap.replicationLagSeconds,
snap.heartbeatLagSeconds,
isThrottled,
)
}

// reportStatus samples progress, emits metrics, and optionally prints status output.
Expand Down
121 changes: 112 additions & 9 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/metrics"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
"github.com/testcontainers/testcontainers-go"
Expand Down Expand Up @@ -410,11 +411,92 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) {
type progressGaugeSpy struct {
names []string
values []float64
tags [][]string
}

func (s *progressGaugeSpy) Gauge(name string, value float64, _ ...string) {
func (s *progressGaugeSpy) Gauge(name string, value float64, tags ...string) {
s.names = append(s.names, name)
s.values = append(s.values, value)
s.tags = append(s.tags, append([]string(nil), tags...))
}

func (s *progressGaugeSpy) Count(name string, value int64, tags ...string) {
}

func (s *progressGaugeSpy) Histogram(name string, value float64, tags ...string) {
}

type cutOverMetricsSpy struct {
countNames []string
countTags [][]string
histogramNames []string
histogramTags [][]string
}

func (s *cutOverMetricsSpy) Gauge(_ string, _ float64, _ ...string) {}

func (s *cutOverMetricsSpy) Count(name string, _ int64, tags ...string) {
s.countNames = append(s.countNames, name)
s.countTags = append(s.countTags, append([]string(nil), tags...))
}

func (s *cutOverMetricsSpy) Histogram(name string, _ float64, tags ...string) {
s.histogramNames = append(s.histogramNames, name)
s.histogramTags = append(s.histogramTags, append([]string(nil), tags...))
}

func TestCutOverOperationWithMetricsRetryThenSuccess(t *testing.T) {
spy := &cutOverMetricsSpy{}
ctx := base.NewMigrationContext()
ctx.Metrics = spy
migrator := NewMigrator(ctx, "test")

attempts := 0
retrier := func(operation func() error, _ ...bool) error {
for {
err := operation()
if err == nil {
return nil
}
if attempts >= 2 {
return err
}
}
}
operation := func() error {
attempts++
if attempts == 1 {
return errors.New("transient cutover failure")
}
return nil
}

require.NoError(t, migrator.cutOverOperationWithMetrics(retrier, operation))
assert.Equal(t, []string{"cut_over.attempts_total", "cut_over.attempts_total"}, spy.countNames)
assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeRetry}, {"outcome:" + metrics.CutOverOutcomeSuccess}}, spy.countTags)
assert.Equal(t, []string{"cut_over.total_duration_milliseconds"}, spy.histogramNames)
assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeSuccess}}, spy.histogramTags)
}

func TestCutOverOperationWithMetricsAbort(t *testing.T) {
spy := &cutOverMetricsSpy{}
ctx := base.NewMigrationContext()
ctx.Metrics = spy
migrator := NewMigrator(ctx, "test")
cutOverErr := errors.New("cutover failed")

retrier := func(operation func() error, _ ...bool) error {
return operation()
}
operation := func() error {
return cutOverErr
}

require.ErrorIs(t, migrator.cutOverOperationWithMetrics(retrier, operation), cutOverErr)
assert.Equal(t, []string{"cut_over.attempts_total", "cut_over.attempts_total"}, spy.countNames)
assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeRetry}, {"outcome:" + metrics.CutOverOutcomeAbort}}, spy.countTags)
assert.Equal(t, []string{"cut_over.total_duration_milliseconds"}, spy.histogramNames)
assert.Equal(t, [][]string{{"outcome:" + metrics.CutOverOutcomeAbort}}, spy.histogramTags)
}

func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) {
Expand All @@ -427,24 +509,26 @@ func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) {

migrator := NewMigrator(ctx, "test")
migrator.reportStatus(NoPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, []string{
"row_copy.rows_copied",
"row_copy.rows_estimate",
"dml.events_applied",
"binlog.backlog_size",
"binlog.backlog_capacity",
"binlog.backlog_utilization",
"lag.replication_seconds",
"lag.heartbeat_seconds",
}, spy.names)
assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01)

spy.names = nil
spy.values = nil
atomic.StoreInt64(&ctx.TotalDMLEventsApplied, 100)
migrator.reportStatus(NoPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
require.Len(t, spy.names, 8)
assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
}

func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) {
Expand All @@ -463,7 +547,7 @@ func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) {
migrator.reportStatus(NoPrintStatusRule, io.Discard)

capacity := float64(cap(migrator.applyEventsQueue))
require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, float64(2), spy.values[3])
assert.Equal(t, capacity, spy.values[4])
assert.InDelta(t, 2/capacity, spy.values[5], 1e-9)
Expand All @@ -480,7 +564,7 @@ func TestReportStatusEmitsGaugesWhenRowCopyComplete(t *testing.T) {
atomic.StoreInt64(&migrator.rowCopyCompleteFlag, 1)
migrator.reportStatus(NoPrintStatusRule, io.Discard)

require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, float64(5000), spy.values[0])
assert.Equal(t, float64(5000), spy.values[1], "rows_estimate tracks rows_copied when row copy is complete")
}
Expand All @@ -500,8 +584,27 @@ func TestReportStatusEmitsGaugesWhenPrintSuppressed(t *testing.T) {
require.False(t, migrator.shouldPrintStatus(HeuristicPrintStatusRule, snap.elapsedSeconds, etaDuration))

migrator.reportStatus(HeuristicPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
require.Len(t, spy.names, 8)
assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
}

func TestReportStatusEmitsLagGaugesWhenThrottled(t *testing.T) {
spy := &progressGaugeSpy{}
ctx := base.NewMigrationContext()
ctx.Metrics = spy
ctx.SetThrottled(true, "max-lag-millis", base.NoThrottleReasonHint)
atomic.StoreInt64(&ctx.CurrentLag, int64(5*time.Second))
ctx.SetLastHeartbeatOnChangelogTime(time.Now().Add(-4 * time.Second))

migrator := NewMigrator(ctx, "test")
migrator.reportStatus(NoPrintStatusRule, io.Discard)

require.GreaterOrEqual(t, len(spy.names), 8)
assert.Equal(t, "lag.replication_seconds", spy.names[6])
assert.Equal(t, "lag.heartbeat_seconds", spy.names[7])
require.Len(t, spy.tags[6], 1)
assert.Equal(t, "throttled:true", spy.tags[6][0])
assert.Equal(t, "throttled:true", spy.tags[7][0])
}

func TestMigratorShouldPrintStatus(t *testing.T) {
Expand Down
31 changes: 0 additions & 31 deletions go/metrics/binlog_backlog.go

This file was deleted.

Loading
Loading