Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 27 additions & 41 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ func (mgtr *Migrator) Migrate() (err error) {
if err := mgtr.hooksExecutor.OnRowCopyComplete(); err != nil {
return err
}
mgtr.printStatus(ForcePrintStatusRule)
mgtr.reportStatus(ForcePrintStatusRule)

if mgtr.migrationContext.IsCountingTableRows() {
mgtr.migrationContext.Log.Info("stopping query for exact row count, because that can accidentally lock out the cut over")
Expand Down Expand Up @@ -766,7 +766,7 @@ func (mgtr *Migrator) Revert() error {
}
}()

mgtr.printStatus(ForcePrintStatusRule)
mgtr.reportStatus(ForcePrintStatusRule)
var retrier func(func() error, ...bool) error
if mgtr.migrationContext.CutOverExponentialBackoff {
retrier = mgtr.retryOperationWithExponentialBackoff
Expand Down Expand Up @@ -944,7 +944,7 @@ func (mgtr *Migrator) waitForEventsUpToLock() error {
waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime)

mgtr.migrationContext.Log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration)
mgtr.printStatus(ForcePrintStatusAndHintRule)
mgtr.reportStatus(ForcePrintStatusAndHintRule)

return nil
}
Expand Down Expand Up @@ -1087,7 +1087,7 @@ func (mgtr *Migrator) atomicCutOver() (err error) {
// initiateServer begins listening on unix socket/tcp for incoming interactive commands
func (mgtr *Migrator) initiateServer() (err error) {
var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) {
mgtr.printStatus(rule, writer)
mgtr.reportStatus(rule, writer)
}
mgtr.server = NewServer(mgtr.migrationContext, mgtr.hooksExecutor, f)
if err := mgtr.server.BindSocketFile(); err != nil {
Expand Down Expand Up @@ -1167,17 +1167,23 @@ func (mgtr *Migrator) initiateInspector() (err error) {
return nil
}

// initiateStatus sets and activates the printStatus() ticker
// reportStatus samples progress and optionally prints status output.
func (mgtr *Migrator) reportStatus(rule PrintStatusRule, writers ...io.Writer) {
snap := mgtr.sampleMigrationProgress()
mgtr.printStatus(rule, snap, writers...)
}

// initiateStatus sets and activates the reportStatus() ticker.
func (mgtr *Migrator) initiateStatus() {
mgtr.printStatus(ForcePrintStatusAndHintRule)
mgtr.reportStatus(ForcePrintStatusAndHintRule)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var previousCount int64
for range ticker.C {
if atomic.LoadInt64(&mgtr.finishedMigrating) > 0 {
return
}
go mgtr.printStatus(HeuristicPrintStatusRule)
go mgtr.reportStatus(HeuristicPrintStatusRule)
totalCopied := atomic.LoadInt64(&mgtr.migrationContext.TotalRowsCopied)
if previousCount > 0 {
copiedThisLoop := totalCopied - previousCount
Expand Down Expand Up @@ -1372,55 +1378,35 @@ func (mgtr *Migrator) shouldPrintMigrationStatusHint(rule PrintStatusRule, elaps
// `rule` indicates the type of output expected.
// By default the status is written to standard output, but other writers can
// be used as well.
func (mgtr *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
func (mgtr *Migrator) printStatus(rule PrintStatusRule, snap migrationProgressSnapshot, writers ...io.Writer) {
if rule == NoPrintStatusRule {
return
}
writers = append(writers, os.Stdout)

elapsedTime := mgtr.migrationContext.ElapsedTime()
elapsedSeconds := int64(elapsedTime.Seconds())
totalRowsCopied := mgtr.migrationContext.GetTotalRowsCopied()
rowsEstimate := atomic.LoadInt64(&mgtr.migrationContext.RowsEstimate) + atomic.LoadInt64(&mgtr.migrationContext.RowsDeltaEstimate)
if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 {
// Done copying rows. The totalRowsCopied value is the de-facto number of rows,
// and there is no further need to keep updating the value.
rowsEstimate = totalRowsCopied
}

// we take the opportunity to update migration context with progressPct
progressPct := mgtr.getProgressPercent(rowsEstimate)
mgtr.migrationContext.SetProgressPct(progressPct)

// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
if mgtr.shouldPrintMigrationStatusHint(rule, elapsedSeconds) {
if mgtr.shouldPrintMigrationStatusHint(rule, snap.elapsedSeconds) {
mgtr.printMigrationStatusHint(writers...)
}

// Get state + ETA
state, eta, etaDuration := mgtr.getMigrationStateAndETA(rowsEstimate)
mgtr.migrationContext.SetETADuration(etaDuration)

if !mgtr.shouldPrintStatus(rule, elapsedSeconds, etaDuration) {
if !mgtr.shouldPrintStatus(rule, snap.elapsedSeconds, snap.etaDuration) {
return
}

currentBinlogCoordinates := mgtr.eventsStreamer.GetCurrentBinlogCoordinates()

status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s",
totalRowsCopied, rowsEstimate, progressPct,
atomic.LoadInt64(&mgtr.migrationContext.TotalDMLEventsApplied),
len(mgtr.applyEventsQueue), cap(mgtr.applyEventsQueue),
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(mgtr.migrationContext.ElapsedRowCopyTime()),
currentBinlogCoordinates.DisplayString(),
mgtr.migrationContext.GetCurrentLagDuration().Seconds(),
mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(),
state,
eta,
snap.totalRowsCopied, snap.rowsEstimate, snap.progressPct,
snap.dmlApplied,
snap.applyEventsBacklog, snap.applyEventsCapacity,
base.PrettifyDurationOutput(snap.elapsedTime), base.PrettifyDurationOutput(snap.elapsedRowCopyTime),
snap.streamerBinlogPosition,
snap.replicationLagSeconds,
snap.heartbeatLagSeconds,
snap.state,
snap.eta,
)
mgtr.applier.WriteChangelog(
fmt.Sprintf("copy iteration %d at %d", mgtr.migrationContext.GetIteration(), time.Now().Unix()),
state,
snap.state,
)
w := io.MultiWriter(writers...)
fmt.Fprintln(w, status)
Expand All @@ -1434,7 +1420,7 @@ func (mgtr *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
mgtr.migrationContext.Log.Info(strings.Replace(status, "%", "%%", 1))

hooksStatusIntervalSec := mgtr.migrationContext.HooksStatusIntervalSec
if hooksStatusIntervalSec > 0 && elapsedSeconds%hooksStatusIntervalSec == 0 {
if hooksStatusIntervalSec > 0 && snap.elapsedSeconds%hooksStatusIntervalSec == 0 {
mgtr.hooksExecutor.OnStatus(status)
}
}
Expand Down
74 changes: 74 additions & 0 deletions go/logic/progress_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2022 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

package logic

import (
"sync/atomic"
"time"
)

// migrationProgressSnapshot captures row-copy, DML, backlog, and lag at a single point in time
// for status output.
type migrationProgressSnapshot struct {
totalRowsCopied int64
rowsEstimate int64
progressPct float64
dmlApplied int64
applyEventsBacklog int
applyEventsCapacity int
state string
eta string
etaDuration time.Duration
elapsedTime time.Duration
elapsedRowCopyTime time.Duration
elapsedSeconds int64
streamerBinlogPosition string
replicationLagSeconds float64
heartbeatLagSeconds float64
}

func (mgtr *Migrator) migrationProgressSnapshot() migrationProgressSnapshot {
totalRowsCopied := mgtr.migrationContext.GetTotalRowsCopied()
rowsEstimate := atomic.LoadInt64(&mgtr.migrationContext.RowsEstimate) + atomic.LoadInt64(&mgtr.migrationContext.RowsDeltaEstimate)
if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 {
// Done copying rows. The totalRowsCopied value is the de-facto number of rows,
// and there is no further need to keep updating the value.
rowsEstimate = totalRowsCopied
}
progressPct := mgtr.getProgressPercent(rowsEstimate)
state, eta, etaDuration := mgtr.getMigrationStateAndETA(rowsEstimate)
elapsedTime := mgtr.migrationContext.ElapsedTime()

streamerBinlogPosition := ""
if mgtr.eventsStreamer != nil {
streamerBinlogPosition = mgtr.eventsStreamer.GetCurrentBinlogCoordinates().DisplayString()
}

return migrationProgressSnapshot{
totalRowsCopied: totalRowsCopied,
rowsEstimate: rowsEstimate,
progressPct: progressPct,
dmlApplied: atomic.LoadInt64(&mgtr.migrationContext.TotalDMLEventsApplied),
applyEventsBacklog: len(mgtr.applyEventsQueue),
applyEventsCapacity: cap(mgtr.applyEventsQueue),
state: state,
eta: eta,
etaDuration: etaDuration,
elapsedTime: elapsedTime,
elapsedRowCopyTime: mgtr.migrationContext.ElapsedRowCopyTime(),
elapsedSeconds: int64(elapsedTime.Seconds()),
streamerBinlogPosition: streamerBinlogPosition,
replicationLagSeconds: mgtr.migrationContext.GetCurrentLagDuration().Seconds(),
heartbeatLagSeconds: mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(),
}
}

func (mgtr *Migrator) sampleMigrationProgress() migrationProgressSnapshot {
snap := mgtr.migrationProgressSnapshot()
mgtr.migrationContext.SetProgressPct(snap.progressPct)
mgtr.migrationContext.SetETADuration(snap.etaDuration)
return snap
}
76 changes: 76 additions & 0 deletions go/logic/progress_snapshot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright 2022 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

package logic

import (
"bytes"
"io"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/github/gh-ost/go/base"
)

func TestMigrationProgressSnapshotCapturesBacklogAndLag(t *testing.T) {
ctx := base.NewMigrationContext()
migrator := NewMigrator(ctx, "test")
atomic.StoreInt64(&ctx.TotalRowsCopied, 100)
atomic.StoreInt64(&ctx.CurrentLag, int64(3*time.Second))
ctx.SetLastHeartbeatOnChangelogTime(time.Now().Add(-2 * time.Second))

snap := migrator.migrationProgressSnapshot()
assert.Equal(t, 0, snap.applyEventsBacklog)
assert.Equal(t, cap(migrator.applyEventsQueue), snap.applyEventsCapacity)
assert.InDelta(t, 3.0, snap.replicationLagSeconds, 0.01)
assert.InDelta(t, 2.0, snap.heartbeatLagSeconds, 0.5)
assert.Empty(t, snap.streamerBinlogPosition)
}

func TestMigrationProgressSnapshotWhenRowCopyComplete(t *testing.T) {
ctx := base.NewMigrationContext()
migrator := NewMigrator(ctx, "test")
atomic.StoreInt64(&ctx.TotalRowsCopied, 5000)
atomic.StoreInt64(&ctx.RowsEstimate, 10000)
atomic.StoreInt64(&migrator.rowCopyCompleteFlag, 1)

snap := migrator.migrationProgressSnapshot()
assert.Equal(t, int64(5000), snap.totalRowsCopied)
assert.Equal(t, int64(5000), snap.rowsEstimate)
assert.InDelta(t, 100.0, snap.progressPct, 0.01)
}

func TestReportStatusSamplesProgress(t *testing.T) {
ctx := base.NewMigrationContext()
atomic.StoreInt64(&ctx.TotalRowsCopied, 1000)
atomic.StoreInt64(&ctx.RowsEstimate, 5000)

migrator := NewMigrator(ctx, "test")
migrator.reportStatus(NoPrintStatusRule, io.Discard)
assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01)
}

func TestReportStatusUpdatesContextWhenPrintSuppressed(t *testing.T) {
ctx := base.NewMigrationContext()
ctx.StartTime = time.Now().Add(-99 * time.Second)
atomic.StoreInt64(&ctx.TotalRowsCopied, 1000)
atomic.StoreInt64(&ctx.RowsEstimate, 5000)
atomic.StoreInt64(&ctx.EtaRowsPerSecond, 1)

migrator := NewMigrator(ctx, "test")
snap := migrator.migrationProgressSnapshot()
require.False(t, migrator.shouldPrintStatus(HeuristicPrintStatusRule, snap.elapsedSeconds, snap.etaDuration))

var buf bytes.Buffer
migrator.printStatus(HeuristicPrintStatusRule, snap, &buf)
assert.Empty(t, buf.String(), "heuristic rule should suppress status line output")

migrator.reportStatus(HeuristicPrintStatusRule, io.Discard)
assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01)
}
Loading