diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 5ed2e2d9a..5c51505e5 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1168,7 +1168,7 @@ func (mgtr *Migrator) initiateInspector() (err error) { return nil } -// emitProgressMetrics emits StatsD gauges from a progress snapshot. +// emitProgressMetrics emits StatsD gauges func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) { metrics.EmitProgressGauges( mgtr.migrationContext.Metrics, @@ -1181,6 +1181,13 @@ func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) { snap.applyEventsBacklog, snap.applyEventsCapacity, ) + isThrottled, _, _ := mgtr.migrationContext.IsThrottled() + metrics.EmitLagGauges( + mgtr.migrationContext.Metrics, + snap.replicationLagSeconds, + snap.heartbeatLagSeconds, + isThrottled, + ) } // reportStatus samples progress, emits metrics, and optionally prints status output. diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 31e161b5e..74b09a47c 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -410,11 +410,13 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) { type progressGaugeSpy struct { names []string values []float64 + tags [][]string } -func (s *progressGaugeSpy) Gauge(name string, value float64, _ ...string) { +func (s *progressGaugeSpy) Gauge(name string, value float64, tags ...string) { s.names = append(s.names, name) s.values = append(s.values, value) + s.tags = append(s.tags, append([]string(nil), tags...)) } func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) { @@ -427,7 +429,7 @@ func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) { migrator := NewMigrator(ctx, "test") migrator.reportStatus(NoPrintStatusRule, io.Discard) - require.Len(t, spy.names, 6) + require.Len(t, spy.names, 8) assert.Equal(t, []string{ "row_copy.rows_copied", "row_copy.rows_estimate", @@ -435,16 +437,18 @@ func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) { "binlog.backlog_size", "binlog.backlog_capacity", "binlog.backlog_utilization", + "lag.replication_seconds", + "lag.heartbeat_seconds", }, spy.names) - assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values) + assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6]) assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01) spy.names = nil spy.values = nil atomic.StoreInt64(&ctx.TotalDMLEventsApplied, 100) migrator.reportStatus(NoPrintStatusRule, io.Discard) - require.Len(t, spy.names, 6) - assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values) + require.Len(t, spy.names, 8) + assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6]) } func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) { @@ -463,7 +467,7 @@ func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) { migrator.reportStatus(NoPrintStatusRule, io.Discard) capacity := float64(cap(migrator.applyEventsQueue)) - require.Len(t, spy.names, 6) + require.Len(t, spy.names, 8) assert.Equal(t, float64(2), spy.values[3]) assert.Equal(t, capacity, spy.values[4]) assert.InDelta(t, 2/capacity, spy.values[5], 1e-9) @@ -480,7 +484,7 @@ func TestReportStatusEmitsGaugesWhenRowCopyComplete(t *testing.T) { atomic.StoreInt64(&migrator.rowCopyCompleteFlag, 1) migrator.reportStatus(NoPrintStatusRule, io.Discard) - require.Len(t, spy.names, 6) + require.Len(t, spy.names, 8) assert.Equal(t, float64(5000), spy.values[0]) assert.Equal(t, float64(5000), spy.values[1], "rows_estimate tracks rows_copied when row copy is complete") } @@ -500,8 +504,27 @@ func TestReportStatusEmitsGaugesWhenPrintSuppressed(t *testing.T) { require.False(t, migrator.shouldPrintStatus(HeuristicPrintStatusRule, snap.elapsedSeconds, etaDuration)) migrator.reportStatus(HeuristicPrintStatusRule, io.Discard) - require.Len(t, spy.names, 6) - assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values) + require.Len(t, spy.names, 8) + assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6]) +} + +func TestReportStatusEmitsLagGaugesWhenThrottled(t *testing.T) { + spy := &progressGaugeSpy{} + ctx := base.NewMigrationContext() + ctx.Metrics = spy + ctx.SetThrottled(true, "max-lag-millis", base.NoThrottleReasonHint) + atomic.StoreInt64(&ctx.CurrentLag, int64(5*time.Second)) + ctx.SetLastHeartbeatOnChangelogTime(time.Now().Add(-4 * time.Second)) + + migrator := NewMigrator(ctx, "test") + migrator.reportStatus(NoPrintStatusRule, io.Discard) + + require.GreaterOrEqual(t, len(spy.names), 8) + assert.Equal(t, "lag.replication_seconds", spy.names[6]) + assert.Equal(t, "lag.heartbeat_seconds", spy.names[7]) + require.Len(t, spy.tags[6], 1) + assert.Equal(t, "throttled:true", spy.tags[6][0]) + assert.Equal(t, "throttled:true", spy.tags[7][0]) } func TestMigratorShouldPrintStatus(t *testing.T) { diff --git a/go/metrics/client.go b/go/metrics/client.go index ed6acc096..4600df8e9 100644 --- a/go/metrics/client.go +++ b/go/metrics/client.go @@ -21,6 +21,11 @@ type Client struct { sd *statsd.Client } +// MemStatsGaugeEmitter is implemented by *Client; used for tests without UDP. +type MemStatsGaugeEmitter interface { + Gauge(name string, value float64, tags ...string) +} + // NewClient connects to addr for StatsD. If addr is empty, returns Noop and nil error. // namespace is typically "gh_ost." (metrics are named namespace + short name, e.g. gh_ost.startup). // tags are global tags applied to every metric (repeatable --statsd-tags). diff --git a/go/metrics/go_runtime.go b/go/metrics/go_runtime.go index 24ae2c6b5..021e987bf 100644 --- a/go/metrics/go_runtime.go +++ b/go/metrics/go_runtime.go @@ -11,11 +11,6 @@ import ( "time" ) -// MemStatsGaugeEmitter is implemented by *Client; used for tests without UDP. -type MemStatsGaugeEmitter interface { - Gauge(name string, value float64, tags ...string) -} - // EmitGoRuntimeGauges emits gh_ost.go_runtime.* gauges (namespace is applied by the client). // m and numGoroutine are typically from runtime.ReadMemStats and runtime.NumGoroutine. func EmitGoRuntimeGauges(emit MemStatsGaugeEmitter, m *runtime.MemStats, numGoroutine int) { diff --git a/go/metrics/go_runtime_test.go b/go/metrics/go_runtime_test.go index 24811206b..0a8a5d7e2 100644 --- a/go/metrics/go_runtime_test.go +++ b/go/metrics/go_runtime_test.go @@ -15,11 +15,13 @@ import ( type gaugeSpy struct { names []string values []float64 + tags [][]string } -func (g *gaugeSpy) Gauge(name string, value float64, _ ...string) { +func (g *gaugeSpy) Gauge(name string, value float64, tags ...string) { g.names = append(g.names, name) g.values = append(g.values, value) + g.tags = append(g.tags, append([]string(nil), tags...)) } func TestEmitGoRuntimeGauges(t *testing.T) { diff --git a/go/metrics/lag.go b/go/metrics/lag.go new file mode 100644 index 000000000..b6dc6995a --- /dev/null +++ b/go/metrics/lag.go @@ -0,0 +1,23 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package metrics + +import "fmt" + +// EmitLagGauges emits replication and heartbeat lag gauges (namespace is applied by the client): +// gh_ost.lag.replication_seconds, gh_ost.lag.heartbeat_seconds, each tagged throttled:true|false. +// +// These are point-in-time readings each status tick (not a distribution), so gauges are used +// rather than histograms — DogStatsD histogram aggregation exposes count/max series that do not +// match the log line lag values in Prometheus/Grafana. +func EmitLagGauges(emit MemStatsGaugeEmitter, replicationLagSeconds, heartbeatLagSeconds float64, throttled bool) { + if emit == nil { + return + } + tags := []string{fmt.Sprintf("throttled:%t", throttled)} + emit.Gauge("lag.replication_seconds", replicationLagSeconds, tags...) + emit.Gauge("lag.heartbeat_seconds", heartbeatLagSeconds, tags...) +} diff --git a/go/metrics/lag_test.go b/go/metrics/lag_test.go new file mode 100644 index 000000000..c330344e3 --- /dev/null +++ b/go/metrics/lag_test.go @@ -0,0 +1,47 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package metrics + +import "testing" + +func TestEmitLagGauges_notThrottled(t *testing.T) { + spy := &gaugeSpy{} + EmitLagGauges(spy, 2.5, 1.25, false) + + wantNames := []string{"lag.replication_seconds", "lag.heartbeat_seconds"} + wantVals := []float64{2.5, 1.25} + wantTags := []string{"throttled:false"} + + if len(spy.names) != len(wantNames) { + t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) + } + for i := range wantNames { + if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { + t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) + } + if len(spy.tags[i]) != 1 || spy.tags[i][0] != wantTags[0] { + t.Fatalf("[%d] got tags %v want [%s]", i, spy.tags[i], wantTags[0]) + } + } +} + +func TestEmitLagGauges_throttled(t *testing.T) { + spy := &gaugeSpy{} + EmitLagGauges(spy, 4.0, 3.0, true) + + if len(spy.names) != 2 { + t.Fatalf("got %d gauges, want 2", len(spy.names)) + } + for i := range spy.names { + if len(spy.tags[i]) != 1 || spy.tags[i][0] != "throttled:true" { + t.Fatalf("[%d] got tags %v want [throttled:true]", i, spy.tags[i]) + } + } +} + +func TestEmitLagGauges_nilSafe(t *testing.T) { + EmitLagGauges(nil, 1, 2, false) +}