Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ type MigrationContext struct {
AbortError error
abortMutex *sync.Mutex

Metrics metrics.MemStatsGaugeEmitter
Metrics metrics.Emitter

OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
Expand Down
30 changes: 24 additions & 6 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator {

// sleepWhileTrue sleeps indefinitely until the given function returns 'false'
// (or fails with error)
func (mgtr *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
func (mgtr *Migrator) sleepWhileTrue(stage string, operation func() (bool, error)) error {
for {
// Check for abort before continuing
if err := mgtr.checkAbort(); err != nil {
Expand All @@ -143,6 +143,7 @@ func (mgtr *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
if !shouldSleep {
return nil
}
metrics.RecordSleep(mgtr.migrationContext.Metrics, stage, time.Second)
time.Sleep(time.Second)
}
}
Expand All @@ -166,7 +167,9 @@ func (mgtr *Migrator) retryOperation(operation func() error, notFatalHint ...boo
for i := 0; i < maxRetries; i++ {
if i != 0 {
// sleep after previous iteration
RetrySleepFn(1 * time.Second)
sleepDuration := 1 * time.Second
metrics.RecordSleep(mgtr.migrationContext.Metrics, "retry_backoff", sleepDuration)
RetrySleepFn(sleepDuration)
}
// Check for abort/context cancellation before each retry
if abortErr := mgtr.checkAbort(); abortErr != nil {
Expand Down Expand Up @@ -207,7 +210,9 @@ func (mgtr *Migrator) retryOperationWithExponentialBackoff(operation func() erro
)

if i != 0 {
RetrySleepFn(time.Duration(interval) * time.Second)
sleepDuration := time.Duration(interval) * time.Second
metrics.RecordSleep(mgtr.migrationContext.Metrics, "retry_backoff", sleepDuration)
RetrySleepFn(sleepDuration)
}
// Check for abort/context cancellation before each retry
if abortErr := mgtr.checkAbort(); abortErr != nil {
Expand Down Expand Up @@ -842,6 +847,7 @@ func (mgtr *Migrator) cutOver() (err error) {
mgtr.migrationContext.MarkPointOfInterest()
mgtr.migrationContext.Log.Debugf("checking for cut-over postpone")
if err := mgtr.sleepWhileTrue(
"cut_over_postpone",
func() (bool, error) {
heartbeatLag := mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog()
maxLagMillisecondsThrottle := time.Duration(atomic.LoadInt64(&mgtr.migrationContext.MaxLagMillisecondsThrottleThreshold)) * time.Millisecond
Expand Down Expand Up @@ -1168,7 +1174,7 @@ func (mgtr *Migrator) initiateInspector() (err error) {
return nil
}

// emitProgressMetrics emits StatsD gauges from a progress snapshot.
// emitProgressMetrics emits StatsD gauges
func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) {
metrics.EmitProgressGauges(
mgtr.migrationContext.Metrics,
Expand All @@ -1181,6 +1187,13 @@ func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) {
snap.applyEventsBacklog,
snap.applyEventsCapacity,
)
isThrottled, _, _ := mgtr.migrationContext.IsThrottled()
metrics.EmitLagGauges(
mgtr.migrationContext.Metrics,
snap.replicationLagSeconds,
snap.heartbeatLagSeconds,
isThrottled,
)
}

// reportStatus samples progress, emits metrics, and optionally prints status output.
Expand Down Expand Up @@ -1754,7 +1767,9 @@ func (mgtr *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) {
return chk, err
}
mgtr.applier.CurrentCoordinatesMutex.Unlock()
time.Sleep(500 * time.Millisecond)
sleepDuration := 500 * time.Millisecond
metrics.RecordSleep(mgtr.migrationContext.Metrics, "replica_wait", sleepDuration)
time.Sleep(sleepDuration)
}
}

Expand Down Expand Up @@ -1859,7 +1874,10 @@ func (mgtr *Migrator) executeWriteFuncs() error {
copyRowsDuration := time.Since(copyRowsStartTime)
sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds())
sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond
time.Sleep(sleepTime)
if sleepTime > 0 {
metrics.RecordSleep(mgtr.migrationContext.Metrics, "chunk_throttle", sleepTime)
time.Sleep(sleepTime)
}
}
}
default:
Expand Down
47 changes: 38 additions & 9 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,19 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) {
type progressGaugeSpy struct {
names []string
values []float64
tags [][]string
}

func (s *progressGaugeSpy) Gauge(name string, value float64, _ ...string) {
func (s *progressGaugeSpy) Gauge(name string, value float64, tags ...string) {
s.names = append(s.names, name)
s.values = append(s.values, value)
s.tags = append(s.tags, append([]string(nil), tags...))
}

func (s *progressGaugeSpy) Count(name string, value int64, tags ...string) {
}

func (s *progressGaugeSpy) Histogram(name string, value float64, tags ...string) {
}

func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) {
Expand All @@ -427,24 +435,26 @@ func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) {

migrator := NewMigrator(ctx, "test")
migrator.reportStatus(NoPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, []string{
"row_copy.rows_copied",
"row_copy.rows_estimate",
"dml.events_applied",
"binlog.backlog_size",
"binlog.backlog_capacity",
"binlog.backlog_utilization",
"lag.replication_seconds",
"lag.heartbeat_seconds",
}, spy.names)
assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01)

spy.names = nil
spy.values = nil
atomic.StoreInt64(&ctx.TotalDMLEventsApplied, 100)
migrator.reportStatus(NoPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
require.Len(t, spy.names, 8)
assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
}

func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) {
Expand All @@ -463,7 +473,7 @@ func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) {
migrator.reportStatus(NoPrintStatusRule, io.Discard)

capacity := float64(cap(migrator.applyEventsQueue))
require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, float64(2), spy.values[3])
assert.Equal(t, capacity, spy.values[4])
assert.InDelta(t, 2/capacity, spy.values[5], 1e-9)
Expand All @@ -480,7 +490,7 @@ func TestReportStatusEmitsGaugesWhenRowCopyComplete(t *testing.T) {
atomic.StoreInt64(&migrator.rowCopyCompleteFlag, 1)
migrator.reportStatus(NoPrintStatusRule, io.Discard)

require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, float64(5000), spy.values[0])
assert.Equal(t, float64(5000), spy.values[1], "rows_estimate tracks rows_copied when row copy is complete")
}
Expand All @@ -500,8 +510,27 @@ func TestReportStatusEmitsGaugesWhenPrintSuppressed(t *testing.T) {
require.False(t, migrator.shouldPrintStatus(HeuristicPrintStatusRule, snap.elapsedSeconds, etaDuration))

migrator.reportStatus(HeuristicPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
require.Len(t, spy.names, 8)
assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
}

func TestReportStatusEmitsLagGaugesWhenThrottled(t *testing.T) {
spy := &progressGaugeSpy{}
ctx := base.NewMigrationContext()
ctx.Metrics = spy
ctx.SetThrottled(true, "max-lag-millis", base.NoThrottleReasonHint)
atomic.StoreInt64(&ctx.CurrentLag, int64(5*time.Second))
ctx.SetLastHeartbeatOnChangelogTime(time.Now().Add(-4 * time.Second))

migrator := NewMigrator(ctx, "test")
migrator.reportStatus(NoPrintStatusRule, io.Discard)

require.GreaterOrEqual(t, len(spy.names), 8)
assert.Equal(t, "lag.replication_seconds", spy.names[6])
assert.Equal(t, "lag.heartbeat_seconds", spy.names[7])
require.Len(t, spy.tags[6], 1)
assert.Equal(t, "throttled:true", spy.tags[6][0])
assert.Equal(t, "throttled:true", spy.tags[7][0])
}

func TestMigratorShouldPrintStatus(t *testing.T) {
Expand Down
31 changes: 0 additions & 31 deletions go/metrics/binlog_backlog.go

This file was deleted.

53 changes: 0 additions & 53 deletions go/metrics/binlog_backlog_test.go

This file was deleted.

15 changes: 15 additions & 0 deletions go/metrics/catalog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# gh-ost metrics catalog
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was waiting to do this until the end but sure https://github.com/Shopify/schema-migrations/issues/5465


Metrics are emitted with the `gh_ost.` namespace.

## Sleep and wait metrics

- `gh_ost.sleep.duration_milliseconds` — histogram of one sleep/wait interval in milliseconds, tagged with `stage`.
- `gh_ost.sleep.total_milliseconds` — count of milliseconds spent sleeping/waiting, tagged with `stage`.

### Sleep stages

- `cut_over_postpone` — waiting for cut-over postponement or heartbeat lag preconditions to clear before the final table swap.
- `chunk_throttle` — `--nice-ratio` sleep after a row-copy chunk.
- `retry_backoff` — fixed or exponential retry backoff between failed operations.
- `replica_wait` — waiting for streamed coordinates to catch up before checkpointing.
14 changes: 14 additions & 0 deletions go/metrics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ type Client struct {
sd *statsd.Client
}

// Emitter is implemented by *Client; used for tests without UDP.
type Emitter interface {
Gauge(name string, value float64, tags ...string)
Count(name string, value int64, tags ...string)
Histogram(name string, value float64, tags ...string)
}

// NewClient connects to addr for StatsD. If addr is empty, returns Noop and nil error.
// namespace is typically "gh_ost." (metrics are named namespace + short name, e.g. gh_ost.startup).
// tags are global tags applied to every metric (repeatable --statsd-tags).
Expand Down Expand Up @@ -60,6 +67,13 @@ func (c *Client) Count(name string, value int64, tags ...string) {
_ = c.sd.Count(name, value, tags, 1.0)
}

func (c *Client) Histogram(name string, value float64, tags ...string) {
if c.sd == nil {
return
}
_ = c.sd.Histogram(name, value, tags, 1.0)
}

// Close flushes buffered metrics; safe for Noop.
func (c *Client) Close() error {
if c.sd == nil {
Expand Down
Loading
Loading