diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 7561cd227..8c3262d18 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -636,7 +636,7 @@ func (mgtr *Migrator) Migrate() (err error) { if err := mgtr.hooksExecutor.OnRowCopyComplete(); err != nil { return err } - mgtr.printStatus(ForcePrintStatusRule) + mgtr.reportStatus(ForcePrintStatusRule) if mgtr.migrationContext.IsCountingTableRows() { mgtr.migrationContext.Log.Info("stopping query for exact row count, because that can accidentally lock out the cut over") @@ -766,7 +766,7 @@ func (mgtr *Migrator) Revert() error { } }() - mgtr.printStatus(ForcePrintStatusRule) + mgtr.reportStatus(ForcePrintStatusRule) var retrier func(func() error, ...bool) error if mgtr.migrationContext.CutOverExponentialBackoff { retrier = mgtr.retryOperationWithExponentialBackoff @@ -944,7 +944,7 @@ func (mgtr *Migrator) waitForEventsUpToLock() error { waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime) mgtr.migrationContext.Log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration) - mgtr.printStatus(ForcePrintStatusAndHintRule) + mgtr.reportStatus(ForcePrintStatusAndHintRule) return nil } @@ -1087,7 +1087,7 @@ func (mgtr *Migrator) atomicCutOver() (err error) { // initiateServer begins listening on unix socket/tcp for incoming interactive commands func (mgtr *Migrator) initiateServer() (err error) { var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) { - mgtr.printStatus(rule, writer) + mgtr.reportStatus(rule, writer) } mgtr.server = NewServer(mgtr.migrationContext, mgtr.hooksExecutor, f) if err := mgtr.server.BindSocketFile(); err != nil { @@ -1167,9 +1167,15 @@ func (mgtr *Migrator) initiateInspector() (err error) { return nil } -// initiateStatus sets and activates the printStatus() ticker +// reportStatus samples progress and optionally prints status output. +func (mgtr *Migrator) reportStatus(rule PrintStatusRule, writers ...io.Writer) { + snap := mgtr.sampleMigrationProgress() + mgtr.printStatus(rule, snap, writers...) +} + +// initiateStatus sets and activates the reportStatus() ticker. func (mgtr *Migrator) initiateStatus() { - mgtr.printStatus(ForcePrintStatusAndHintRule) + mgtr.reportStatus(ForcePrintStatusAndHintRule) ticker := time.NewTicker(time.Second) defer ticker.Stop() var previousCount int64 @@ -1177,7 +1183,7 @@ func (mgtr *Migrator) initiateStatus() { if atomic.LoadInt64(&mgtr.finishedMigrating) > 0 { return } - go mgtr.printStatus(HeuristicPrintStatusRule) + go mgtr.reportStatus(HeuristicPrintStatusRule) totalCopied := atomic.LoadInt64(&mgtr.migrationContext.TotalRowsCopied) if previousCount > 0 { copiedThisLoop := totalCopied - previousCount @@ -1372,55 +1378,35 @@ func (mgtr *Migrator) shouldPrintMigrationStatusHint(rule PrintStatusRule, elaps // `rule` indicates the type of output expected. // By default the status is written to standard output, but other writers can // be used as well. -func (mgtr *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { +func (mgtr *Migrator) printStatus(rule PrintStatusRule, snap migrationProgressSnapshot, writers ...io.Writer) { if rule == NoPrintStatusRule { return } writers = append(writers, os.Stdout) - elapsedTime := mgtr.migrationContext.ElapsedTime() - elapsedSeconds := int64(elapsedTime.Seconds()) - totalRowsCopied := mgtr.migrationContext.GetTotalRowsCopied() - rowsEstimate := atomic.LoadInt64(&mgtr.migrationContext.RowsEstimate) + atomic.LoadInt64(&mgtr.migrationContext.RowsDeltaEstimate) - if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { - // Done copying rows. The totalRowsCopied value is the de-facto number of rows, - // and there is no further need to keep updating the value. - rowsEstimate = totalRowsCopied - } - - // we take the opportunity to update migration context with progressPct - progressPct := mgtr.getProgressPercent(rowsEstimate) - mgtr.migrationContext.SetProgressPct(progressPct) - // Before status, let's see if we should print a nice reminder for what exactly we're doing here. - if mgtr.shouldPrintMigrationStatusHint(rule, elapsedSeconds) { + if mgtr.shouldPrintMigrationStatusHint(rule, snap.elapsedSeconds) { mgtr.printMigrationStatusHint(writers...) } - // Get state + ETA - state, eta, etaDuration := mgtr.getMigrationStateAndETA(rowsEstimate) - mgtr.migrationContext.SetETADuration(etaDuration) - - if !mgtr.shouldPrintStatus(rule, elapsedSeconds, etaDuration) { + if !mgtr.shouldPrintStatus(rule, snap.elapsedSeconds, snap.etaDuration) { return } - currentBinlogCoordinates := mgtr.eventsStreamer.GetCurrentBinlogCoordinates() - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", - totalRowsCopied, rowsEstimate, progressPct, - atomic.LoadInt64(&mgtr.migrationContext.TotalDMLEventsApplied), - len(mgtr.applyEventsQueue), cap(mgtr.applyEventsQueue), - base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(mgtr.migrationContext.ElapsedRowCopyTime()), - currentBinlogCoordinates.DisplayString(), - mgtr.migrationContext.GetCurrentLagDuration().Seconds(), - mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(), - state, - eta, + snap.totalRowsCopied, snap.rowsEstimate, snap.progressPct, + snap.dmlApplied, + snap.applyEventsBacklog, snap.applyEventsCapacity, + base.PrettifyDurationOutput(snap.elapsedTime), base.PrettifyDurationOutput(snap.elapsedRowCopyTime), + snap.streamerBinlogPosition, + snap.replicationLagSeconds, + snap.heartbeatLagSeconds, + snap.state, + snap.eta, ) mgtr.applier.WriteChangelog( fmt.Sprintf("copy iteration %d at %d", mgtr.migrationContext.GetIteration(), time.Now().Unix()), - state, + snap.state, ) w := io.MultiWriter(writers...) fmt.Fprintln(w, status) @@ -1434,7 +1420,7 @@ func (mgtr *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { mgtr.migrationContext.Log.Info(strings.Replace(status, "%", "%%", 1)) hooksStatusIntervalSec := mgtr.migrationContext.HooksStatusIntervalSec - if hooksStatusIntervalSec > 0 && elapsedSeconds%hooksStatusIntervalSec == 0 { + if hooksStatusIntervalSec > 0 && snap.elapsedSeconds%hooksStatusIntervalSec == 0 { mgtr.hooksExecutor.OnStatus(status) } } diff --git a/go/logic/progress_snapshot.go b/go/logic/progress_snapshot.go new file mode 100644 index 000000000..375532cca --- /dev/null +++ b/go/logic/progress_snapshot.go @@ -0,0 +1,74 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "sync/atomic" + "time" +) + +// migrationProgressSnapshot captures row-copy, DML, backlog, and lag at a single point in time +// for status output. +type migrationProgressSnapshot struct { + totalRowsCopied int64 + rowsEstimate int64 + progressPct float64 + dmlApplied int64 + applyEventsBacklog int + applyEventsCapacity int + state string + eta string + etaDuration time.Duration + elapsedTime time.Duration + elapsedRowCopyTime time.Duration + elapsedSeconds int64 + streamerBinlogPosition string + replicationLagSeconds float64 + heartbeatLagSeconds float64 +} + +func (mgtr *Migrator) migrationProgressSnapshot() migrationProgressSnapshot { + totalRowsCopied := mgtr.migrationContext.GetTotalRowsCopied() + rowsEstimate := atomic.LoadInt64(&mgtr.migrationContext.RowsEstimate) + atomic.LoadInt64(&mgtr.migrationContext.RowsDeltaEstimate) + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { + // Done copying rows. The totalRowsCopied value is the de-facto number of rows, + // and there is no further need to keep updating the value. + rowsEstimate = totalRowsCopied + } + progressPct := mgtr.getProgressPercent(rowsEstimate) + state, eta, etaDuration := mgtr.getMigrationStateAndETA(rowsEstimate) + elapsedTime := mgtr.migrationContext.ElapsedTime() + + streamerBinlogPosition := "" + if mgtr.eventsStreamer != nil { + streamerBinlogPosition = mgtr.eventsStreamer.GetCurrentBinlogCoordinates().DisplayString() + } + + return migrationProgressSnapshot{ + totalRowsCopied: totalRowsCopied, + rowsEstimate: rowsEstimate, + progressPct: progressPct, + dmlApplied: atomic.LoadInt64(&mgtr.migrationContext.TotalDMLEventsApplied), + applyEventsBacklog: len(mgtr.applyEventsQueue), + applyEventsCapacity: cap(mgtr.applyEventsQueue), + state: state, + eta: eta, + etaDuration: etaDuration, + elapsedTime: elapsedTime, + elapsedRowCopyTime: mgtr.migrationContext.ElapsedRowCopyTime(), + elapsedSeconds: int64(elapsedTime.Seconds()), + streamerBinlogPosition: streamerBinlogPosition, + replicationLagSeconds: mgtr.migrationContext.GetCurrentLagDuration().Seconds(), + heartbeatLagSeconds: mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(), + } +} + +func (mgtr *Migrator) sampleMigrationProgress() migrationProgressSnapshot { + snap := mgtr.migrationProgressSnapshot() + mgtr.migrationContext.SetProgressPct(snap.progressPct) + mgtr.migrationContext.SetETADuration(snap.etaDuration) + return snap +} diff --git a/go/logic/progress_snapshot_test.go b/go/logic/progress_snapshot_test.go new file mode 100644 index 000000000..1e42faf13 --- /dev/null +++ b/go/logic/progress_snapshot_test.go @@ -0,0 +1,76 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "bytes" + "io" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/github/gh-ost/go/base" +) + +func TestMigrationProgressSnapshotCapturesBacklogAndLag(t *testing.T) { + ctx := base.NewMigrationContext() + migrator := NewMigrator(ctx, "test") + atomic.StoreInt64(&ctx.TotalRowsCopied, 100) + atomic.StoreInt64(&ctx.CurrentLag, int64(3*time.Second)) + ctx.SetLastHeartbeatOnChangelogTime(time.Now().Add(-2 * time.Second)) + + snap := migrator.migrationProgressSnapshot() + assert.Equal(t, 0, snap.applyEventsBacklog) + assert.Equal(t, cap(migrator.applyEventsQueue), snap.applyEventsCapacity) + assert.InDelta(t, 3.0, snap.replicationLagSeconds, 0.01) + assert.InDelta(t, 2.0, snap.heartbeatLagSeconds, 0.5) + assert.Empty(t, snap.streamerBinlogPosition) +} + +func TestMigrationProgressSnapshotWhenRowCopyComplete(t *testing.T) { + ctx := base.NewMigrationContext() + migrator := NewMigrator(ctx, "test") + atomic.StoreInt64(&ctx.TotalRowsCopied, 5000) + atomic.StoreInt64(&ctx.RowsEstimate, 10000) + atomic.StoreInt64(&migrator.rowCopyCompleteFlag, 1) + + snap := migrator.migrationProgressSnapshot() + assert.Equal(t, int64(5000), snap.totalRowsCopied) + assert.Equal(t, int64(5000), snap.rowsEstimate) + assert.InDelta(t, 100.0, snap.progressPct, 0.01) +} + +func TestReportStatusSamplesProgress(t *testing.T) { + ctx := base.NewMigrationContext() + atomic.StoreInt64(&ctx.TotalRowsCopied, 1000) + atomic.StoreInt64(&ctx.RowsEstimate, 5000) + + migrator := NewMigrator(ctx, "test") + migrator.reportStatus(NoPrintStatusRule, io.Discard) + assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01) +} + +func TestReportStatusUpdatesContextWhenPrintSuppressed(t *testing.T) { + ctx := base.NewMigrationContext() + ctx.StartTime = time.Now().Add(-99 * time.Second) + atomic.StoreInt64(&ctx.TotalRowsCopied, 1000) + atomic.StoreInt64(&ctx.RowsEstimate, 5000) + atomic.StoreInt64(&ctx.EtaRowsPerSecond, 1) + + migrator := NewMigrator(ctx, "test") + snap := migrator.migrationProgressSnapshot() + require.False(t, migrator.shouldPrintStatus(HeuristicPrintStatusRule, snap.elapsedSeconds, snap.etaDuration)) + + var buf bytes.Buffer + migrator.printStatus(HeuristicPrintStatusRule, snap, &buf) + assert.Empty(t, buf.String(), "heuristic rule should suppress status line output") + + migrator.reportStatus(HeuristicPrintStatusRule, io.Discard) + assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01) +}