From 32e9a240ed446d4c1f8ac7eca97e3fe87e6bd530 Mon Sep 17 00:00:00 2001 From: Patrick Begley Date: Thu, 28 May 2026 10:57:09 -0400 Subject: [PATCH 1/6] Emit replication and heartbeat lag --- go/base/context.go | 2 +- go/logic/migrator.go | 9 +++++- go/logic/migrator_test.go | 50 +++++++++++++++++++++++++++++++++ go/metrics/client.go | 23 +++++++++++++++ go/metrics/go_runtime.go | 5 ---- go/metrics/lag.go | 19 +++++++++++++ go/metrics/lag_test.go | 59 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 160 insertions(+), 7 deletions(-) create mode 100644 go/metrics/lag.go create mode 100644 go/metrics/lag_test.go diff --git a/go/base/context.go b/go/base/context.go index 5a06d0d4d..4d0c3b09d 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -238,7 +238,7 @@ type MigrationContext struct { AbortError error abortMutex *sync.Mutex - Metrics metrics.MemStatsGaugeEmitter + Metrics metrics.Emitter OriginalTableColumnsOnApplier *sql.ColumnList OriginalTableColumns *sql.ColumnList diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 5ed2e2d9a..f19098eb8 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1168,7 +1168,7 @@ func (mgtr *Migrator) initiateInspector() (err error) { return nil } -// emitProgressMetrics emits StatsD gauges from a progress snapshot. +// emitProgressMetrics emits StatsD gauges func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) { metrics.EmitProgressGauges( mgtr.migrationContext.Metrics, @@ -1181,6 +1181,13 @@ func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) { snap.applyEventsBacklog, snap.applyEventsCapacity, ) + isThrottled, _, _ := mgtr.migrationContext.IsThrottled() + metrics.EmitLagHistograms( + mgtr.migrationContext.Metrics, + snap.replicationLagSeconds, + snap.heartbeatLagSeconds, + isThrottled, + ) } // reportStatus samples progress, emits metrics, and optionally prints status output. diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 31e161b5e..31cb642dd 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -417,6 +417,21 @@ func (s *progressGaugeSpy) Gauge(name string, value float64, _ ...string) { s.values = append(s.values, value) } +func (s *progressGaugeSpy) Histogram(string, float64, ...string) {} + +type statusMetricsSpy struct { + progressGaugeSpy + histogramNames []string + histogramValues []float64 + histogramTags [][]string +} + +func (s *statusMetricsSpy) Histogram(name string, value float64, tags ...string) { + s.histogramNames = append(s.histogramNames, name) + s.histogramValues = append(s.histogramValues, value) + s.histogramTags = append(s.histogramTags, append([]string(nil), tags...)) +} + func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) { spy := &progressGaugeSpy{} ctx := base.NewMigrationContext() @@ -504,6 +519,41 @@ func TestReportStatusEmitsGaugesWhenPrintSuppressed(t *testing.T) { assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values) } +func TestReportStatusEmitsLagHistogramsWhenNotThrottled(t *testing.T) { + spy := &statusMetricsSpy{} + ctx := base.NewMigrationContext() + ctx.Metrics = spy + atomic.StoreInt64(&ctx.CurrentLag, int64(3*time.Second)) + ctx.SetLastHeartbeatOnChangelogTime(time.Now().Add(-2 * time.Second)) + + migrator := NewMigrator(ctx, "test") + migrator.reportStatus(NoPrintStatusRule, io.Discard) + + require.Len(t, spy.histogramNames, 2) + assert.Equal(t, []string{"lag.replication_seconds", "lag.heartbeat_seconds"}, spy.histogramNames) + assert.InDelta(t, 3.0, spy.histogramValues[0], 0.01) + assert.InDelta(t, 2.0, spy.histogramValues[1], 0.5) + require.Len(t, spy.histogramTags[0], 1) + assert.Equal(t, "throttled:false", spy.histogramTags[0][0]) + assert.Equal(t, "throttled:false", spy.histogramTags[1][0]) +} + +func TestReportStatusEmitsLagHistogramsWhenThrottled(t *testing.T) { + spy := &statusMetricsSpy{} + ctx := base.NewMigrationContext() + ctx.Metrics = spy + ctx.SetThrottled(true, "max-lag-millis", base.NoThrottleReasonHint) + atomic.StoreInt64(&ctx.CurrentLag, int64(5*time.Second)) + ctx.SetLastHeartbeatOnChangelogTime(time.Now().Add(-4 * time.Second)) + + migrator := NewMigrator(ctx, "test") + migrator.reportStatus(NoPrintStatusRule, io.Discard) + + require.Len(t, spy.histogramNames, 2) + assert.Equal(t, "throttled:true", spy.histogramTags[0][0]) + assert.Equal(t, "throttled:true", spy.histogramTags[1][0]) +} + func TestMigratorShouldPrintStatus(t *testing.T) { migrationContext := base.NewMigrationContext() migrator := NewMigrator(migrationContext, "1.2.3") diff --git a/go/metrics/client.go b/go/metrics/client.go index ed6acc096..e7953ae82 100644 --- a/go/metrics/client.go +++ b/go/metrics/client.go @@ -21,6 +21,22 @@ type Client struct { sd *statsd.Client } +// MemStatsGaugeEmitter is implemented by *Client; used for tests without UDP. +type MemStatsGaugeEmitter interface { + Gauge(name string, value float64, tags ...string) +} + +// HistogramEmitter is implemented by *Client; used for tests without UDP. +type HistogramEmitter interface { + Histogram(name string, value float64, tags ...string) +} + +// Emitter combines gauge and histogram emission for migration status metrics. +type Emitter interface { + MemStatsGaugeEmitter + HistogramEmitter +} + // NewClient connects to addr for StatsD. If addr is empty, returns Noop and nil error. // namespace is typically "gh_ost." (metrics are named namespace + short name, e.g. gh_ost.startup). // tags are global tags applied to every metric (repeatable --statsd-tags). @@ -60,6 +76,13 @@ func (c *Client) Count(name string, value int64, tags ...string) { _ = c.sd.Count(name, value, tags, 1.0) } +func (c *Client) Histogram(name string, value float64, tags ...string) { + if c.sd == nil { + return + } + _ = c.sd.Histogram(name, value, tags, 1.0) +} + // Close flushes buffered metrics; safe for Noop. func (c *Client) Close() error { if c.sd == nil { diff --git a/go/metrics/go_runtime.go b/go/metrics/go_runtime.go index 24ae2c6b5..021e987bf 100644 --- a/go/metrics/go_runtime.go +++ b/go/metrics/go_runtime.go @@ -11,11 +11,6 @@ import ( "time" ) -// MemStatsGaugeEmitter is implemented by *Client; used for tests without UDP. -type MemStatsGaugeEmitter interface { - Gauge(name string, value float64, tags ...string) -} - // EmitGoRuntimeGauges emits gh_ost.go_runtime.* gauges (namespace is applied by the client). // m and numGoroutine are typically from runtime.ReadMemStats and runtime.NumGoroutine. func EmitGoRuntimeGauges(emit MemStatsGaugeEmitter, m *runtime.MemStats, numGoroutine int) { diff --git a/go/metrics/lag.go b/go/metrics/lag.go new file mode 100644 index 000000000..2683aac54 --- /dev/null +++ b/go/metrics/lag.go @@ -0,0 +1,19 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package metrics + +import "fmt" + +// EmitLagHistograms emits replication and heartbeat lag histograms (namespace is applied by the client): +// gh_ost.lag.replication_seconds, gh_ost.lag.heartbeat_seconds, each tagged throttled:true|false. +func EmitLagHistograms(emit HistogramEmitter, replicationLagSeconds, heartbeatLagSeconds float64, throttled bool) { + if emit == nil { + return + } + tags := []string{fmt.Sprintf("throttled:%t", throttled)} + emit.Histogram("lag.replication_seconds", replicationLagSeconds, tags...) + emit.Histogram("lag.heartbeat_seconds", heartbeatLagSeconds, tags...) +} diff --git a/go/metrics/lag_test.go b/go/metrics/lag_test.go new file mode 100644 index 000000000..2a0b21418 --- /dev/null +++ b/go/metrics/lag_test.go @@ -0,0 +1,59 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package metrics + +import "testing" + +type histogramSpy struct { + names []string + values []float64 + tags [][]string +} + +func (s *histogramSpy) Histogram(name string, value float64, tags ...string) { + s.names = append(s.names, name) + s.values = append(s.values, value) + s.tags = append(s.tags, append([]string(nil), tags...)) +} + +func TestEmitLagHistograms_notThrottled(t *testing.T) { + spy := &histogramSpy{} + EmitLagHistograms(spy, 2.5, 1.25, false) + + wantNames := []string{"lag.replication_seconds", "lag.heartbeat_seconds"} + wantVals := []float64{2.5, 1.25} + wantTags := []string{"throttled:false"} + + if len(spy.names) != 2 { + t.Fatalf("got %d histograms, want 2", len(spy.names)) + } + for i := range wantNames { + if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { + t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) + } + if len(spy.tags[i]) != 1 || spy.tags[i][0] != wantTags[0] { + t.Fatalf("[%d] got tags %v want [%s]", i, spy.tags[i], wantTags[0]) + } + } +} + +func TestEmitLagHistograms_throttled(t *testing.T) { + spy := &histogramSpy{} + EmitLagHistograms(spy, 4.0, 3.0, true) + + if len(spy.names) != 2 { + t.Fatalf("got %d histograms, want 2", len(spy.names)) + } + for i := range spy.names { + if len(spy.tags[i]) != 1 || spy.tags[i][0] != "throttled:true" { + t.Fatalf("[%d] got tags %v want [throttled:true]", i, spy.tags[i]) + } + } +} + +func TestEmitLagHistograms_nilSafe(t *testing.T) { + EmitLagHistograms(nil, 1, 2, false) +} From 1b13b78395730b9544a550626d7faf3462373fcf Mon Sep 17 00:00:00 2001 From: Patrick Begley Date: Thu, 28 May 2026 10:59:45 -0400 Subject: [PATCH 2/6] pruning comments --- go/metrics/client.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/go/metrics/client.go b/go/metrics/client.go index e7953ae82..283f476d1 100644 --- a/go/metrics/client.go +++ b/go/metrics/client.go @@ -21,17 +21,13 @@ type Client struct { sd *statsd.Client } -// MemStatsGaugeEmitter is implemented by *Client; used for tests without UDP. type MemStatsGaugeEmitter interface { Gauge(name string, value float64, tags ...string) } - -// HistogramEmitter is implemented by *Client; used for tests without UDP. type HistogramEmitter interface { Histogram(name string, value float64, tags ...string) } -// Emitter combines gauge and histogram emission for migration status metrics. type Emitter interface { MemStatsGaugeEmitter HistogramEmitter From 7b25a866dc38c68ceba13b1b225e5d4931ebe398 Mon Sep 17 00:00:00 2001 From: Patrick Begley Date: Thu, 28 May 2026 11:37:41 -0400 Subject: [PATCH 3/6] use gauge not histogram --- go/base/context.go | 2 +- go/logic/migrator.go | 2 +- go/logic/migrator_test.go | 69 +++++++++++------------------------ go/metrics/client.go | 16 +------- go/metrics/go_runtime_test.go | 4 +- go/metrics/lag.go | 12 ++++-- go/metrics/lag_test.go | 34 ++++++----------- 7 files changed, 46 insertions(+), 93 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 4d0c3b09d..5a06d0d4d 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -238,7 +238,7 @@ type MigrationContext struct { AbortError error abortMutex *sync.Mutex - Metrics metrics.Emitter + Metrics metrics.MemStatsGaugeEmitter OriginalTableColumnsOnApplier *sql.ColumnList OriginalTableColumns *sql.ColumnList diff --git a/go/logic/migrator.go b/go/logic/migrator.go index f19098eb8..5c51505e5 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1182,7 +1182,7 @@ func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) { snap.applyEventsCapacity, ) isThrottled, _, _ := mgtr.migrationContext.IsThrottled() - metrics.EmitLagHistograms( + metrics.EmitLagGauges( mgtr.migrationContext.Metrics, snap.replicationLagSeconds, snap.heartbeatLagSeconds, diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 31cb642dd..74b09a47c 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -410,26 +410,13 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) { type progressGaugeSpy struct { names []string values []float64 + tags [][]string } -func (s *progressGaugeSpy) Gauge(name string, value float64, _ ...string) { +func (s *progressGaugeSpy) Gauge(name string, value float64, tags ...string) { s.names = append(s.names, name) s.values = append(s.values, value) -} - -func (s *progressGaugeSpy) Histogram(string, float64, ...string) {} - -type statusMetricsSpy struct { - progressGaugeSpy - histogramNames []string - histogramValues []float64 - histogramTags [][]string -} - -func (s *statusMetricsSpy) Histogram(name string, value float64, tags ...string) { - s.histogramNames = append(s.histogramNames, name) - s.histogramValues = append(s.histogramValues, value) - s.histogramTags = append(s.histogramTags, append([]string(nil), tags...)) + s.tags = append(s.tags, append([]string(nil), tags...)) } func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) { @@ -442,7 +429,7 @@ func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) { migrator := NewMigrator(ctx, "test") migrator.reportStatus(NoPrintStatusRule, io.Discard) - require.Len(t, spy.names, 6) + require.Len(t, spy.names, 8) assert.Equal(t, []string{ "row_copy.rows_copied", "row_copy.rows_estimate", @@ -450,16 +437,18 @@ func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) { "binlog.backlog_size", "binlog.backlog_capacity", "binlog.backlog_utilization", + "lag.replication_seconds", + "lag.heartbeat_seconds", }, spy.names) - assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values) + assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6]) assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01) spy.names = nil spy.values = nil atomic.StoreInt64(&ctx.TotalDMLEventsApplied, 100) migrator.reportStatus(NoPrintStatusRule, io.Discard) - require.Len(t, spy.names, 6) - assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values) + require.Len(t, spy.names, 8) + assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6]) } func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) { @@ -478,7 +467,7 @@ func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) { migrator.reportStatus(NoPrintStatusRule, io.Discard) capacity := float64(cap(migrator.applyEventsQueue)) - require.Len(t, spy.names, 6) + require.Len(t, spy.names, 8) assert.Equal(t, float64(2), spy.values[3]) assert.Equal(t, capacity, spy.values[4]) assert.InDelta(t, 2/capacity, spy.values[5], 1e-9) @@ -495,7 +484,7 @@ func TestReportStatusEmitsGaugesWhenRowCopyComplete(t *testing.T) { atomic.StoreInt64(&migrator.rowCopyCompleteFlag, 1) migrator.reportStatus(NoPrintStatusRule, io.Discard) - require.Len(t, spy.names, 6) + require.Len(t, spy.names, 8) assert.Equal(t, float64(5000), spy.values[0]) assert.Equal(t, float64(5000), spy.values[1], "rows_estimate tracks rows_copied when row copy is complete") } @@ -515,31 +504,12 @@ func TestReportStatusEmitsGaugesWhenPrintSuppressed(t *testing.T) { require.False(t, migrator.shouldPrintStatus(HeuristicPrintStatusRule, snap.elapsedSeconds, etaDuration)) migrator.reportStatus(HeuristicPrintStatusRule, io.Discard) - require.Len(t, spy.names, 6) - assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values) -} - -func TestReportStatusEmitsLagHistogramsWhenNotThrottled(t *testing.T) { - spy := &statusMetricsSpy{} - ctx := base.NewMigrationContext() - ctx.Metrics = spy - atomic.StoreInt64(&ctx.CurrentLag, int64(3*time.Second)) - ctx.SetLastHeartbeatOnChangelogTime(time.Now().Add(-2 * time.Second)) - - migrator := NewMigrator(ctx, "test") - migrator.reportStatus(NoPrintStatusRule, io.Discard) - - require.Len(t, spy.histogramNames, 2) - assert.Equal(t, []string{"lag.replication_seconds", "lag.heartbeat_seconds"}, spy.histogramNames) - assert.InDelta(t, 3.0, spy.histogramValues[0], 0.01) - assert.InDelta(t, 2.0, spy.histogramValues[1], 0.5) - require.Len(t, spy.histogramTags[0], 1) - assert.Equal(t, "throttled:false", spy.histogramTags[0][0]) - assert.Equal(t, "throttled:false", spy.histogramTags[1][0]) + require.Len(t, spy.names, 8) + assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6]) } -func TestReportStatusEmitsLagHistogramsWhenThrottled(t *testing.T) { - spy := &statusMetricsSpy{} +func TestReportStatusEmitsLagGaugesWhenThrottled(t *testing.T) { + spy := &progressGaugeSpy{} ctx := base.NewMigrationContext() ctx.Metrics = spy ctx.SetThrottled(true, "max-lag-millis", base.NoThrottleReasonHint) @@ -549,9 +519,12 @@ func TestReportStatusEmitsLagHistogramsWhenThrottled(t *testing.T) { migrator := NewMigrator(ctx, "test") migrator.reportStatus(NoPrintStatusRule, io.Discard) - require.Len(t, spy.histogramNames, 2) - assert.Equal(t, "throttled:true", spy.histogramTags[0][0]) - assert.Equal(t, "throttled:true", spy.histogramTags[1][0]) + require.GreaterOrEqual(t, len(spy.names), 8) + assert.Equal(t, "lag.replication_seconds", spy.names[6]) + assert.Equal(t, "lag.heartbeat_seconds", spy.names[7]) + require.Len(t, spy.tags[6], 1) + assert.Equal(t, "throttled:true", spy.tags[6][0]) + assert.Equal(t, "throttled:true", spy.tags[7][0]) } func TestMigratorShouldPrintStatus(t *testing.T) { diff --git a/go/metrics/client.go b/go/metrics/client.go index 283f476d1..4600df8e9 100644 --- a/go/metrics/client.go +++ b/go/metrics/client.go @@ -21,17 +21,10 @@ type Client struct { sd *statsd.Client } +// MemStatsGaugeEmitter is implemented by *Client; used for tests without UDP. type MemStatsGaugeEmitter interface { Gauge(name string, value float64, tags ...string) } -type HistogramEmitter interface { - Histogram(name string, value float64, tags ...string) -} - -type Emitter interface { - MemStatsGaugeEmitter - HistogramEmitter -} // NewClient connects to addr for StatsD. If addr is empty, returns Noop and nil error. // namespace is typically "gh_ost." (metrics are named namespace + short name, e.g. gh_ost.startup). @@ -72,13 +65,6 @@ func (c *Client) Count(name string, value int64, tags ...string) { _ = c.sd.Count(name, value, tags, 1.0) } -func (c *Client) Histogram(name string, value float64, tags ...string) { - if c.sd == nil { - return - } - _ = c.sd.Histogram(name, value, tags, 1.0) -} - // Close flushes buffered metrics; safe for Noop. func (c *Client) Close() error { if c.sd == nil { diff --git a/go/metrics/go_runtime_test.go b/go/metrics/go_runtime_test.go index 24811206b..0a8a5d7e2 100644 --- a/go/metrics/go_runtime_test.go +++ b/go/metrics/go_runtime_test.go @@ -15,11 +15,13 @@ import ( type gaugeSpy struct { names []string values []float64 + tags [][]string } -func (g *gaugeSpy) Gauge(name string, value float64, _ ...string) { +func (g *gaugeSpy) Gauge(name string, value float64, tags ...string) { g.names = append(g.names, name) g.values = append(g.values, value) + g.tags = append(g.tags, append([]string(nil), tags...)) } func TestEmitGoRuntimeGauges(t *testing.T) { diff --git a/go/metrics/lag.go b/go/metrics/lag.go index 2683aac54..b6dc6995a 100644 --- a/go/metrics/lag.go +++ b/go/metrics/lag.go @@ -7,13 +7,17 @@ package metrics import "fmt" -// EmitLagHistograms emits replication and heartbeat lag histograms (namespace is applied by the client): +// EmitLagGauges emits replication and heartbeat lag gauges (namespace is applied by the client): // gh_ost.lag.replication_seconds, gh_ost.lag.heartbeat_seconds, each tagged throttled:true|false. -func EmitLagHistograms(emit HistogramEmitter, replicationLagSeconds, heartbeatLagSeconds float64, throttled bool) { +// +// These are point-in-time readings each status tick (not a distribution), so gauges are used +// rather than histograms — DogStatsD histogram aggregation exposes count/max series that do not +// match the log line lag values in Prometheus/Grafana. +func EmitLagGauges(emit MemStatsGaugeEmitter, replicationLagSeconds, heartbeatLagSeconds float64, throttled bool) { if emit == nil { return } tags := []string{fmt.Sprintf("throttled:%t", throttled)} - emit.Histogram("lag.replication_seconds", replicationLagSeconds, tags...) - emit.Histogram("lag.heartbeat_seconds", heartbeatLagSeconds, tags...) + emit.Gauge("lag.replication_seconds", replicationLagSeconds, tags...) + emit.Gauge("lag.heartbeat_seconds", heartbeatLagSeconds, tags...) } diff --git a/go/metrics/lag_test.go b/go/metrics/lag_test.go index 2a0b21418..c330344e3 100644 --- a/go/metrics/lag_test.go +++ b/go/metrics/lag_test.go @@ -7,28 +7,16 @@ package metrics import "testing" -type histogramSpy struct { - names []string - values []float64 - tags [][]string -} - -func (s *histogramSpy) Histogram(name string, value float64, tags ...string) { - s.names = append(s.names, name) - s.values = append(s.values, value) - s.tags = append(s.tags, append([]string(nil), tags...)) -} - -func TestEmitLagHistograms_notThrottled(t *testing.T) { - spy := &histogramSpy{} - EmitLagHistograms(spy, 2.5, 1.25, false) +func TestEmitLagGauges_notThrottled(t *testing.T) { + spy := &gaugeSpy{} + EmitLagGauges(spy, 2.5, 1.25, false) wantNames := []string{"lag.replication_seconds", "lag.heartbeat_seconds"} wantVals := []float64{2.5, 1.25} wantTags := []string{"throttled:false"} - if len(spy.names) != 2 { - t.Fatalf("got %d histograms, want 2", len(spy.names)) + if len(spy.names) != len(wantNames) { + t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) } for i := range wantNames { if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { @@ -40,12 +28,12 @@ func TestEmitLagHistograms_notThrottled(t *testing.T) { } } -func TestEmitLagHistograms_throttled(t *testing.T) { - spy := &histogramSpy{} - EmitLagHistograms(spy, 4.0, 3.0, true) +func TestEmitLagGauges_throttled(t *testing.T) { + spy := &gaugeSpy{} + EmitLagGauges(spy, 4.0, 3.0, true) if len(spy.names) != 2 { - t.Fatalf("got %d histograms, want 2", len(spy.names)) + t.Fatalf("got %d gauges, want 2", len(spy.names)) } for i := range spy.names { if len(spy.tags[i]) != 1 || spy.tags[i][0] != "throttled:true" { @@ -54,6 +42,6 @@ func TestEmitLagHistograms_throttled(t *testing.T) { } } -func TestEmitLagHistograms_nilSafe(t *testing.T) { - EmitLagHistograms(nil, 1, 2, false) +func TestEmitLagGauges_nilSafe(t *testing.T) { + EmitLagGauges(nil, 1, 2, false) } From 9b8cb09101d37c0cbe46a70821934fbe1cc43da7 Mon Sep 17 00:00:00 2001 From: Patrick Begley Date: Mon, 1 Jun 2026 13:56:27 -0400 Subject: [PATCH 4/6] Refactor metrics emitter interface Use a single metrics emitter abstraction for gauge, count, and histogram samples so metric helpers share one testable client contract. --- go/base/context.go | 2 +- go/logic/migrator_test.go | 6 ++++++ go/metrics/binlog_backlog.go | 2 +- go/metrics/client.go | 13 +++++++++++-- go/metrics/go_runtime.go | 2 +- go/metrics/go_runtime_test.go | 6 ++++++ go/metrics/lag.go | 2 +- go/metrics/progress.go | 2 +- 8 files changed, 28 insertions(+), 7 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 5a06d0d4d..4d0c3b09d 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -238,7 +238,7 @@ type MigrationContext struct { AbortError error abortMutex *sync.Mutex - Metrics metrics.MemStatsGaugeEmitter + Metrics metrics.Emitter OriginalTableColumnsOnApplier *sql.ColumnList OriginalTableColumns *sql.ColumnList diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 74b09a47c..d939ae97d 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -419,6 +419,12 @@ func (s *progressGaugeSpy) Gauge(name string, value float64, tags ...string) { s.tags = append(s.tags, append([]string(nil), tags...)) } +func (s *progressGaugeSpy) Count(name string, value int64, tags ...string) { +} + +func (s *progressGaugeSpy) Histogram(name string, value float64, tags ...string) { +} + func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) { spy := &progressGaugeSpy{} ctx := base.NewMigrationContext() diff --git a/go/metrics/binlog_backlog.go b/go/metrics/binlog_backlog.go index 2d6ea5a4b..6ce102ae6 100644 --- a/go/metrics/binlog_backlog.go +++ b/go/metrics/binlog_backlog.go @@ -7,7 +7,7 @@ package metrics // EmitBinlogBacklogGauges emits apply-events queue depth gauges (namespace is applied by the client): // gh_ost.binlog.backlog_size, gh_ost.binlog.backlog_capacity, gh_ost.binlog.backlog_utilization. -func EmitBinlogBacklogGauges(emit MemStatsGaugeEmitter, backlogSize, backlogCapacity int) { +func EmitBinlogBacklogGauges(emit Emitter, backlogSize, backlogCapacity int) { if emit == nil { return } diff --git a/go/metrics/client.go b/go/metrics/client.go index 4600df8e9..1d78df216 100644 --- a/go/metrics/client.go +++ b/go/metrics/client.go @@ -21,9 +21,11 @@ type Client struct { sd *statsd.Client } -// MemStatsGaugeEmitter is implemented by *Client; used for tests without UDP. -type MemStatsGaugeEmitter interface { +// Emitter is implemented by *Client; used for tests without UDP. +type Emitter interface { Gauge(name string, value float64, tags ...string) + Count(name string, value int64, tags ...string) + Histogram(name string, value float64, tags ...string) } // NewClient connects to addr for StatsD. If addr is empty, returns Noop and nil error. @@ -65,6 +67,13 @@ func (c *Client) Count(name string, value int64, tags ...string) { _ = c.sd.Count(name, value, tags, 1.0) } +func (c *Client) Histogram(name string, value float64, tags ...string) { + if c.sd == nil { + return + } + _ = c.sd.Histogram(name, value, tags, 1.0) +} + // Close flushes buffered metrics; safe for Noop. func (c *Client) Close() error { if c.sd == nil { diff --git a/go/metrics/go_runtime.go b/go/metrics/go_runtime.go index 021e987bf..92526b339 100644 --- a/go/metrics/go_runtime.go +++ b/go/metrics/go_runtime.go @@ -13,7 +13,7 @@ import ( // EmitGoRuntimeGauges emits gh_ost.go_runtime.* gauges (namespace is applied by the client). // m and numGoroutine are typically from runtime.ReadMemStats and runtime.NumGoroutine. -func EmitGoRuntimeGauges(emit MemStatsGaugeEmitter, m *runtime.MemStats, numGoroutine int) { +func EmitGoRuntimeGauges(emit Emitter, m *runtime.MemStats, numGoroutine int) { if emit == nil || m == nil { return } diff --git a/go/metrics/go_runtime_test.go b/go/metrics/go_runtime_test.go index 0a8a5d7e2..02657b860 100644 --- a/go/metrics/go_runtime_test.go +++ b/go/metrics/go_runtime_test.go @@ -24,6 +24,12 @@ func (g *gaugeSpy) Gauge(name string, value float64, tags ...string) { g.tags = append(g.tags, append([]string(nil), tags...)) } +func (g *gaugeSpy) Count(name string, value int64, tags ...string) { +} + +func (g *gaugeSpy) Histogram(name string, value float64, tags ...string) { +} + func TestEmitGoRuntimeGauges(t *testing.T) { spy := &gaugeSpy{} m := &runtime.MemStats{ diff --git a/go/metrics/lag.go b/go/metrics/lag.go index b6dc6995a..9a03d4f85 100644 --- a/go/metrics/lag.go +++ b/go/metrics/lag.go @@ -13,7 +13,7 @@ import "fmt" // These are point-in-time readings each status tick (not a distribution), so gauges are used // rather than histograms — DogStatsD histogram aggregation exposes count/max series that do not // match the log line lag values in Prometheus/Grafana. -func EmitLagGauges(emit MemStatsGaugeEmitter, replicationLagSeconds, heartbeatLagSeconds float64, throttled bool) { +func EmitLagGauges(emit Emitter, replicationLagSeconds, heartbeatLagSeconds float64, throttled bool) { if emit == nil { return } diff --git a/go/metrics/progress.go b/go/metrics/progress.go index b6b3361b6..bc525bc8b 100644 --- a/go/metrics/progress.go +++ b/go/metrics/progress.go @@ -7,7 +7,7 @@ package metrics // EmitProgressGauges emits row-copy and DML progress gauges (namespace is applied by the client): // gh_ost.row_copy.rows_copied, gh_ost.row_copy.rows_estimate, gh_ost.dml.events_applied. -func EmitProgressGauges(emit MemStatsGaugeEmitter, rowsCopied, rowsEstimate, dmlEventsApplied int64) { +func EmitProgressGauges(emit Emitter, rowsCopied, rowsEstimate, dmlEventsApplied int64) { if emit == nil { return } From 63e4992b1269eadafe2a0fd2da5d2b100c280329 Mon Sep 17 00:00:00 2001 From: Patrick Begley Date: Mon, 1 Jun 2026 14:00:17 -0400 Subject: [PATCH 5/6] Consolidate metrics emit helpers Keep the small metric emission helpers, runtime reporter, and their tests together so the metrics package has fewer one-function files. --- go/metrics/binlog_backlog.go | 31 ----- go/metrics/binlog_backlog_test.go | 53 --------- go/metrics/emit.go | 108 ++++++++++++++++++ go/metrics/emit_test.go | 184 ++++++++++++++++++++++++++++++ go/metrics/go_runtime.go | 56 --------- go/metrics/go_runtime_test.go | 75 ------------ go/metrics/lag.go | 23 ---- go/metrics/lag_test.go | 47 -------- go/metrics/progress.go | 17 --- go/metrics/progress_test.go | 33 ------ 10 files changed, 292 insertions(+), 335 deletions(-) delete mode 100644 go/metrics/binlog_backlog.go delete mode 100644 go/metrics/binlog_backlog_test.go create mode 100644 go/metrics/emit.go create mode 100644 go/metrics/emit_test.go delete mode 100644 go/metrics/go_runtime.go delete mode 100644 go/metrics/go_runtime_test.go delete mode 100644 go/metrics/lag.go delete mode 100644 go/metrics/lag_test.go delete mode 100644 go/metrics/progress.go delete mode 100644 go/metrics/progress_test.go diff --git a/go/metrics/binlog_backlog.go b/go/metrics/binlog_backlog.go deleted file mode 100644 index 6ce102ae6..000000000 --- a/go/metrics/binlog_backlog.go +++ /dev/null @@ -1,31 +0,0 @@ -/* - Copyright 2026 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE -*/ - -package metrics - -// EmitBinlogBacklogGauges emits apply-events queue depth gauges (namespace is applied by the client): -// gh_ost.binlog.backlog_size, gh_ost.binlog.backlog_capacity, gh_ost.binlog.backlog_utilization. -func EmitBinlogBacklogGauges(emit Emitter, backlogSize, backlogCapacity int) { - if emit == nil { - return - } - emit.Gauge("binlog.backlog_size", float64(backlogSize)) - emit.Gauge("binlog.backlog_capacity", float64(backlogCapacity)) - emit.Gauge("binlog.backlog_utilization", binlogBacklogUtilization(backlogSize, backlogCapacity)) -} - -func binlogBacklogUtilization(backlogSize, backlogCapacity int) float64 { - if backlogCapacity <= 0 { - return 0 - } - utilization := float64(backlogSize) / float64(backlogCapacity) - if utilization > 1 { - return 1 - } - if utilization < 0 { - return 0 - } - return utilization -} diff --git a/go/metrics/binlog_backlog_test.go b/go/metrics/binlog_backlog_test.go deleted file mode 100644 index 4f99dc1b1..000000000 --- a/go/metrics/binlog_backlog_test.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - Copyright 2026 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE -*/ - -package metrics - -import "testing" - -func TestEmitBinlogBacklogGauges(t *testing.T) { - spy := &gaugeSpy{} - EmitBinlogBacklogGauges(spy, 250, 1000) - - wantNames := []string{ - "binlog.backlog_size", - "binlog.backlog_capacity", - "binlog.backlog_utilization", - } - wantVals := []float64{250, 1000, 0.25} - - if len(spy.names) != len(wantNames) { - t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) - } - for i := range wantNames { - if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { - t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) - } - } -} - -func TestEmitBinlogBacklogGauges_nilSafe(t *testing.T) { - EmitBinlogBacklogGauges(nil, 1, 2) -} - -func TestBinlogBacklogUtilization(t *testing.T) { - tests := []struct { - size, capacity int - want float64 - }{ - {0, 1000, 0}, - {250, 1000, 0.25}, - {1000, 1000, 1}, - {1500, 1000, 1}, - {-1, 1000, 0}, - {10, 0, 0}, - } - for _, tt := range tests { - got := binlogBacklogUtilization(tt.size, tt.capacity) - if got != tt.want { - t.Fatalf("utilization(%d, %d) = %v, want %v", tt.size, tt.capacity, got, tt.want) - } - } -} diff --git a/go/metrics/emit.go b/go/metrics/emit.go new file mode 100644 index 000000000..c24f0a5c4 --- /dev/null +++ b/go/metrics/emit.go @@ -0,0 +1,108 @@ +/* + Copyright 2026 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package metrics + +import ( + "context" + "fmt" + "runtime" + "time" +) + +// EmitProgressGauges emits row-copy and DML progress gauges (namespace is applied by the client): +// gh_ost.row_copy.rows_copied, gh_ost.row_copy.rows_estimate, gh_ost.dml.events_applied. +func EmitProgressGauges(emit Emitter, rowsCopied, rowsEstimate, dmlEventsApplied int64) { + if emit == nil { + return + } + emit.Gauge("row_copy.rows_copied", float64(rowsCopied)) + emit.Gauge("row_copy.rows_estimate", float64(rowsEstimate)) + emit.Gauge("dml.events_applied", float64(dmlEventsApplied)) +} + +// EmitBinlogBacklogGauges emits apply-events queue depth gauges (namespace is applied by the client): +// gh_ost.binlog.backlog_size, gh_ost.binlog.backlog_capacity, gh_ost.binlog.backlog_utilization. +func EmitBinlogBacklogGauges(emit Emitter, backlogSize, backlogCapacity int) { + if emit == nil { + return + } + emit.Gauge("binlog.backlog_size", float64(backlogSize)) + emit.Gauge("binlog.backlog_capacity", float64(backlogCapacity)) + emit.Gauge("binlog.backlog_utilization", binlogBacklogUtilization(backlogSize, backlogCapacity)) +} + +func binlogBacklogUtilization(backlogSize, backlogCapacity int) float64 { + if backlogCapacity <= 0 { + return 0 + } + utilization := float64(backlogSize) / float64(backlogCapacity) + if utilization > 1 { + return 1 + } + if utilization < 0 { + return 0 + } + return utilization +} + +// EmitLagGauges emits replication and heartbeat lag gauges (namespace is applied by the client): +// gh_ost.lag.replication_seconds, gh_ost.lag.heartbeat_seconds, each tagged throttled:true|false. +// +// These are point-in-time readings each status tick (not a distribution), so gauges are used +// rather than histograms; DogStatsD histogram aggregation exposes count/max series that do not +// match the log line lag values in Prometheus/Grafana. +func EmitLagGauges(emit Emitter, replicationLagSeconds, heartbeatLagSeconds float64, throttled bool) { + if emit == nil { + return + } + tags := []string{fmt.Sprintf("throttled:%t", throttled)} + emit.Gauge("lag.replication_seconds", replicationLagSeconds, tags...) + emit.Gauge("lag.heartbeat_seconds", heartbeatLagSeconds, tags...) +} + +// EmitGoRuntimeGauges emits gh_ost.go_runtime.* gauges (namespace is applied by the client). +// m and numGoroutine are typically from runtime.ReadMemStats and runtime.NumGoroutine. +func EmitGoRuntimeGauges(emit Emitter, m *runtime.MemStats, numGoroutine int) { + if emit == nil || m == nil { + return + } + emit.Gauge("go_runtime.alloc_bytes", float64(m.Alloc)) + emit.Gauge("go_runtime.sys_bytes", float64(m.Sys)) + emit.Gauge("go_runtime.heap_inuse_bytes", float64(m.HeapInuse)) + emit.Gauge("go_runtime.num_gc", float64(m.NumGC)) + emit.Gauge("go_runtime.gc_pause_total_ns", float64(m.PauseTotalNs)) + emit.Gauge("go_runtime.goroutines", float64(numGoroutine)) +} + +// StartGoRuntimeReporter periodically samples runtime memory and goroutines and emits gauges +// until ctx is cancelled. It is a no-op when interval <= 0, client is nil, or StatsD is disabled +// (noop client). +func StartGoRuntimeReporter(ctx context.Context, client *Client, interval time.Duration) { + if ctx == nil || client == nil || interval <= 0 || client.sd == nil { + return + } + + emit := func() { + var m runtime.MemStats + runtime.ReadMemStats(&m) + EmitGoRuntimeGauges(client, &m, runtime.NumGoroutine()) + } + + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + emit() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + emit() + } + } + }() +} diff --git a/go/metrics/emit_test.go b/go/metrics/emit_test.go new file mode 100644 index 000000000..acfffbb56 --- /dev/null +++ b/go/metrics/emit_test.go @@ -0,0 +1,184 @@ +/* + Copyright 2026 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package metrics + +import ( + "context" + "runtime" + "testing" + "time" +) + +type gaugeSpy struct { + names []string + values []float64 + tags [][]string +} + +func (g *gaugeSpy) Gauge(name string, value float64, tags ...string) { + g.names = append(g.names, name) + g.values = append(g.values, value) + g.tags = append(g.tags, append([]string(nil), tags...)) +} + +func (g *gaugeSpy) Count(name string, value int64, tags ...string) { +} + +func (g *gaugeSpy) Histogram(name string, value float64, tags ...string) { +} + +func TestEmitProgressGauges(t *testing.T) { + spy := &gaugeSpy{} + EmitProgressGauges(spy, 1000, 5000, 42) + + wantNames := []string{ + "row_copy.rows_copied", + "row_copy.rows_estimate", + "dml.events_applied", + } + wantVals := []float64{1000, 5000, 42} + + if len(spy.names) != len(wantNames) { + t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) + } + for i := range wantNames { + if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { + t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) + } + } +} + +func TestEmitProgressGauges_nilSafe(t *testing.T) { + EmitProgressGauges(nil, 1, 2, 3) +} + +func TestEmitBinlogBacklogGauges(t *testing.T) { + spy := &gaugeSpy{} + EmitBinlogBacklogGauges(spy, 250, 1000) + + wantNames := []string{ + "binlog.backlog_size", + "binlog.backlog_capacity", + "binlog.backlog_utilization", + } + wantVals := []float64{250, 1000, 0.25} + + if len(spy.names) != len(wantNames) { + t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) + } + for i := range wantNames { + if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { + t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) + } + } +} + +func TestEmitBinlogBacklogGauges_nilSafe(t *testing.T) { + EmitBinlogBacklogGauges(nil, 1, 2) +} + +func TestBinlogBacklogUtilization(t *testing.T) { + tests := []struct { + size, capacity int + want float64 + }{ + {0, 1000, 0}, + {250, 1000, 0.25}, + {1000, 1000, 1}, + {1500, 1000, 1}, + {-1, 1000, 0}, + {10, 0, 0}, + } + for _, tt := range tests { + got := binlogBacklogUtilization(tt.size, tt.capacity) + if got != tt.want { + t.Fatalf("utilization(%d, %d) = %v, want %v", tt.size, tt.capacity, got, tt.want) + } + } +} + +func TestEmitLagGauges_notThrottled(t *testing.T) { + spy := &gaugeSpy{} + EmitLagGauges(spy, 2.5, 1.25, false) + + wantNames := []string{"lag.replication_seconds", "lag.heartbeat_seconds"} + wantVals := []float64{2.5, 1.25} + wantTags := []string{"throttled:false"} + + if len(spy.names) != len(wantNames) { + t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) + } + for i := range wantNames { + if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { + t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) + } + if len(spy.tags[i]) != 1 || spy.tags[i][0] != wantTags[0] { + t.Fatalf("[%d] got tags %v want [%s]", i, spy.tags[i], wantTags[0]) + } + } +} + +func TestEmitLagGauges_throttled(t *testing.T) { + spy := &gaugeSpy{} + EmitLagGauges(spy, 4.0, 3.0, true) + + if len(spy.names) != 2 { + t.Fatalf("got %d gauges, want 2", len(spy.names)) + } + for i := range spy.names { + if len(spy.tags[i]) != 1 || spy.tags[i][0] != "throttled:true" { + t.Fatalf("[%d] got tags %v want [throttled:true]", i, spy.tags[i]) + } + } +} + +func TestEmitLagGauges_nilSafe(t *testing.T) { + EmitLagGauges(nil, 1, 2, false) +} + +func TestEmitGoRuntimeGauges(t *testing.T) { + spy := &gaugeSpy{} + m := &runtime.MemStats{ + Alloc: 100, + Sys: 200, + HeapInuse: 300, + NumGC: 7, + PauseTotalNs: 42, + } + EmitGoRuntimeGauges(spy, m, 123) + + wantNames := []string{ + "go_runtime.alloc_bytes", + "go_runtime.sys_bytes", + "go_runtime.heap_inuse_bytes", + "go_runtime.num_gc", + "go_runtime.gc_pause_total_ns", + "go_runtime.goroutines", + } + wantVals := []float64{100, 200, 300, 7, 42, 123} + + if len(spy.names) != len(wantNames) { + t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) + } + for i := range wantNames { + if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { + t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) + } + } +} + +func TestEmitGoRuntimeGauges_nilSafe(t *testing.T) { + EmitGoRuntimeGauges(nil, &runtime.MemStats{}, 1) + EmitGoRuntimeGauges(&gaugeSpy{}, nil, 1) +} + +func TestStartGoRuntimeReporter_stopsOnCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + c := &Client{} // sd nil, so the reporter should not start. + StartGoRuntimeReporter(ctx, c, time.Millisecond) + cancel() + time.Sleep(20 * time.Millisecond) +} diff --git a/go/metrics/go_runtime.go b/go/metrics/go_runtime.go deleted file mode 100644 index 92526b339..000000000 --- a/go/metrics/go_runtime.go +++ /dev/null @@ -1,56 +0,0 @@ -/* - Copyright 2022 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE -*/ - -package metrics - -import ( - "context" - "runtime" - "time" -) - -// EmitGoRuntimeGauges emits gh_ost.go_runtime.* gauges (namespace is applied by the client). -// m and numGoroutine are typically from runtime.ReadMemStats and runtime.NumGoroutine. -func EmitGoRuntimeGauges(emit Emitter, m *runtime.MemStats, numGoroutine int) { - if emit == nil || m == nil { - return - } - emit.Gauge("go_runtime.alloc_bytes", float64(m.Alloc)) - emit.Gauge("go_runtime.sys_bytes", float64(m.Sys)) - emit.Gauge("go_runtime.heap_inuse_bytes", float64(m.HeapInuse)) - emit.Gauge("go_runtime.num_gc", float64(m.NumGC)) - emit.Gauge("go_runtime.gc_pause_total_ns", float64(m.PauseTotalNs)) - emit.Gauge("go_runtime.goroutines", float64(numGoroutine)) -} - -// StartGoRuntimeReporter periodically samples runtime memory and goroutines and emits gauges -// until ctx is cancelled. It is a no-op when interval <= 0, client is nil, or StatsD is disabled -// (noop client). -func StartGoRuntimeReporter(ctx context.Context, client *Client, interval time.Duration) { - if ctx == nil || client == nil || interval <= 0 || client.sd == nil { - return - } - - emit := func() { - var m runtime.MemStats - runtime.ReadMemStats(&m) - EmitGoRuntimeGauges(client, &m, runtime.NumGoroutine()) - } - - go func() { - ticker := time.NewTicker(interval) - defer ticker.Stop() - - emit() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - emit() - } - } - }() -} diff --git a/go/metrics/go_runtime_test.go b/go/metrics/go_runtime_test.go deleted file mode 100644 index 02657b860..000000000 --- a/go/metrics/go_runtime_test.go +++ /dev/null @@ -1,75 +0,0 @@ -/* - Copyright 2022 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE -*/ - -package metrics - -import ( - "context" - "runtime" - "testing" - "time" -) - -type gaugeSpy struct { - names []string - values []float64 - tags [][]string -} - -func (g *gaugeSpy) Gauge(name string, value float64, tags ...string) { - g.names = append(g.names, name) - g.values = append(g.values, value) - g.tags = append(g.tags, append([]string(nil), tags...)) -} - -func (g *gaugeSpy) Count(name string, value int64, tags ...string) { -} - -func (g *gaugeSpy) Histogram(name string, value float64, tags ...string) { -} - -func TestEmitGoRuntimeGauges(t *testing.T) { - spy := &gaugeSpy{} - m := &runtime.MemStats{ - Alloc: 100, - Sys: 200, - HeapInuse: 300, - NumGC: 7, - PauseTotalNs: 42, - } - EmitGoRuntimeGauges(spy, m, 123) - - wantNames := []string{ - "go_runtime.alloc_bytes", - "go_runtime.sys_bytes", - "go_runtime.heap_inuse_bytes", - "go_runtime.num_gc", - "go_runtime.gc_pause_total_ns", - "go_runtime.goroutines", - } - wantVals := []float64{100, 200, 300, 7, 42, 123} - - if len(spy.names) != len(wantNames) { - t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) - } - for i := range wantNames { - if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { - t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) - } - } -} - -func TestEmitGoRuntimeGauges_nilSafe(t *testing.T) { - EmitGoRuntimeGauges(nil, &runtime.MemStats{}, 1) - EmitGoRuntimeGauges(&gaugeSpy{}, nil, 1) -} - -func TestStartGoRuntimeReporter_stopsOnCancel(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - c := &Client{} // sd nil — should not start - StartGoRuntimeReporter(ctx, c, time.Millisecond) - cancel() - time.Sleep(20 * time.Millisecond) -} diff --git a/go/metrics/lag.go b/go/metrics/lag.go deleted file mode 100644 index 9a03d4f85..000000000 --- a/go/metrics/lag.go +++ /dev/null @@ -1,23 +0,0 @@ -/* - Copyright 2022 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE -*/ - -package metrics - -import "fmt" - -// EmitLagGauges emits replication and heartbeat lag gauges (namespace is applied by the client): -// gh_ost.lag.replication_seconds, gh_ost.lag.heartbeat_seconds, each tagged throttled:true|false. -// -// These are point-in-time readings each status tick (not a distribution), so gauges are used -// rather than histograms — DogStatsD histogram aggregation exposes count/max series that do not -// match the log line lag values in Prometheus/Grafana. -func EmitLagGauges(emit Emitter, replicationLagSeconds, heartbeatLagSeconds float64, throttled bool) { - if emit == nil { - return - } - tags := []string{fmt.Sprintf("throttled:%t", throttled)} - emit.Gauge("lag.replication_seconds", replicationLagSeconds, tags...) - emit.Gauge("lag.heartbeat_seconds", heartbeatLagSeconds, tags...) -} diff --git a/go/metrics/lag_test.go b/go/metrics/lag_test.go deleted file mode 100644 index c330344e3..000000000 --- a/go/metrics/lag_test.go +++ /dev/null @@ -1,47 +0,0 @@ -/* - Copyright 2022 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE -*/ - -package metrics - -import "testing" - -func TestEmitLagGauges_notThrottled(t *testing.T) { - spy := &gaugeSpy{} - EmitLagGauges(spy, 2.5, 1.25, false) - - wantNames := []string{"lag.replication_seconds", "lag.heartbeat_seconds"} - wantVals := []float64{2.5, 1.25} - wantTags := []string{"throttled:false"} - - if len(spy.names) != len(wantNames) { - t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) - } - for i := range wantNames { - if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { - t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) - } - if len(spy.tags[i]) != 1 || spy.tags[i][0] != wantTags[0] { - t.Fatalf("[%d] got tags %v want [%s]", i, spy.tags[i], wantTags[0]) - } - } -} - -func TestEmitLagGauges_throttled(t *testing.T) { - spy := &gaugeSpy{} - EmitLagGauges(spy, 4.0, 3.0, true) - - if len(spy.names) != 2 { - t.Fatalf("got %d gauges, want 2", len(spy.names)) - } - for i := range spy.names { - if len(spy.tags[i]) != 1 || spy.tags[i][0] != "throttled:true" { - t.Fatalf("[%d] got tags %v want [throttled:true]", i, spy.tags[i]) - } - } -} - -func TestEmitLagGauges_nilSafe(t *testing.T) { - EmitLagGauges(nil, 1, 2, false) -} diff --git a/go/metrics/progress.go b/go/metrics/progress.go deleted file mode 100644 index bc525bc8b..000000000 --- a/go/metrics/progress.go +++ /dev/null @@ -1,17 +0,0 @@ -/* - Copyright 2022 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE -*/ - -package metrics - -// EmitProgressGauges emits row-copy and DML progress gauges (namespace is applied by the client): -// gh_ost.row_copy.rows_copied, gh_ost.row_copy.rows_estimate, gh_ost.dml.events_applied. -func EmitProgressGauges(emit Emitter, rowsCopied, rowsEstimate, dmlEventsApplied int64) { - if emit == nil { - return - } - emit.Gauge("row_copy.rows_copied", float64(rowsCopied)) - emit.Gauge("row_copy.rows_estimate", float64(rowsEstimate)) - emit.Gauge("dml.events_applied", float64(dmlEventsApplied)) -} diff --git a/go/metrics/progress_test.go b/go/metrics/progress_test.go deleted file mode 100644 index 4a6ce5d3b..000000000 --- a/go/metrics/progress_test.go +++ /dev/null @@ -1,33 +0,0 @@ -/* - Copyright 2022 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE -*/ - -package metrics - -import "testing" - -func TestEmitProgressGauges(t *testing.T) { - spy := &gaugeSpy{} - EmitProgressGauges(spy, 1000, 5000, 42) - - wantNames := []string{ - "row_copy.rows_copied", - "row_copy.rows_estimate", - "dml.events_applied", - } - wantVals := []float64{1000, 5000, 42} - - if len(spy.names) != len(wantNames) { - t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) - } - for i := range wantNames { - if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { - t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) - } - } -} - -func TestEmitProgressGauges_nilSafe(t *testing.T) { - EmitProgressGauges(nil, 1, 2, 3) -} From 36da844c1cb2c2ec3befc65aa46f76f005aa3763 Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Tue, 2 Jun 2026 11:33:29 +0200 Subject: [PATCH 6/6] Emit query latency metrics Add a query duration helper and instrument source/target query paths with millisecond latency histograms. Metrics emitted: - query.duration_milliseconds tagged with side, kind, and outcome Initial coverage includes chunk copy, binlog apply, range select, and exact row count queries. This lets dashboards compare source and target query latency side-by-side and break down latency by query kind. Add unit coverage for the query duration metric helper. --- go/logic/applier.go | 12 ++++++++++++ go/logic/inspect.go | 4 ++++ go/metrics/emit.go | 20 ++++++++++++++++++++ go/metrics/emit_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 77 insertions(+) diff --git a/go/logic/applier.go b/go/logic/applier.go index 3bf32e124..3d88acc0a 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -16,6 +16,7 @@ import ( "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/metrics" "github.com/github/gh-ost/go/sql" "context" @@ -893,8 +894,10 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool return hasFurtherRange, err } + queryStartTime := time.Now() rows, err := apl.db.Query(query, explodedArgs...) if err != nil { + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err) return hasFurtherRange, err } defer rows.Close() @@ -902,13 +905,16 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool iterationRangeMaxValues := sql.NewColumnValues(apl.migrationContext.UniqueKey.Len()) for rows.Next() { if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil { + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err) return hasFurtherRange, err } hasFurtherRange = true } if err = rows.Err(); err != nil { + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err) return hasFurtherRange, err } + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), nil) if hasFurtherRange { apl.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues return hasFurtherRange, nil @@ -956,7 +962,9 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i if _, err := tx.Exec(sessionQuery); err != nil { return nil, err } + queryStartTime := time.Now() result, err := tx.Exec(query, explodedArgs...) + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "chunk_copy", time.Since(queryStartTime), err) if err != nil { return nil, err } @@ -1669,13 +1677,16 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e // in the batch. SHOW WARNINGS only shows warnings from the last statement in a // multi-statement query, so we interleave SHOW WARNINGS after each DML statement. if apl.migrationContext.PanicOnWarnings { + queryStartTime := time.Now() totalDelta, err = apl.executeBatchWithWarningChecking(ctx, tx, buildResults) + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), err) if err != nil { return rollback(err) } } else { // Fast path: batch together DML queries into multi-statements to minimize network trips. // We use the raw driver connection to access the rows affected for each statement. + queryStartTime := time.Now() execErr := conn.Raw(func(driverConn any) error { ex := driverConn.(driver.ExecerContext) nvc := driverConn.(driver.NamedValueChecker) @@ -1709,6 +1720,7 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e return nil }) + metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), execErr) if execErr != nil { return rollback(execErr) } diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 96aadd672..982be1d98 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -16,6 +16,7 @@ import ( "time" "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/metrics" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" @@ -677,13 +678,16 @@ func (isp *Inspector) CountTableRows(ctx context.Context) error { query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) var rowsEstimate int64 + queryStartTime := time.Now() if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil { + metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), err) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { isp.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err()) return mysql.Kill(isp.db, connectionID) } return err } + metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), nil) // row count query finished. nil out the cancel func, so the main migration thread // doesn't bother calling it after row copy is done. diff --git a/go/metrics/emit.go b/go/metrics/emit.go index c24f0a5c4..8cd0add83 100644 --- a/go/metrics/emit.go +++ b/go/metrics/emit.go @@ -106,3 +106,23 @@ func StartGoRuntimeReporter(ctx context.Context, client *Client, interval time.D } }() } + +type queryHistogramEmitter interface { + Histogram(name string, value float64, tags ...string) +} + +// RecordQueryDuration emits gh_ost.query.duration_milliseconds with side/kind/outcome tags. +func RecordQueryDuration(emit any, side string, kind string, duration time.Duration, err error) { + if emit == nil || side == "" || kind == "" || duration < 0 { + return + } + histogramEmitter, ok := emit.(queryHistogramEmitter) + if !ok { + return + } + outcome := "ok" + if err != nil { + outcome = "error" + } + histogramEmitter.Histogram("query.duration_milliseconds", float64(duration.Milliseconds()), "side:"+side, "kind:"+kind, "outcome:"+outcome) +} diff --git a/go/metrics/emit_test.go b/go/metrics/emit_test.go index acfffbb56..366f246bb 100644 --- a/go/metrics/emit_test.go +++ b/go/metrics/emit_test.go @@ -7,7 +7,9 @@ package metrics import ( "context" + "errors" "runtime" + "slices" "testing" "time" ) @@ -182,3 +184,42 @@ func TestStartGoRuntimeReporter_stopsOnCancel(t *testing.T) { cancel() time.Sleep(20 * time.Millisecond) } + +type histogramSpy struct { + names []string + values []float64 + tags [][]string +} + +func (h *histogramSpy) Histogram(name string, value float64, tags ...string) { + h.names = append(h.names, name) + h.values = append(h.values, value) + h.tags = append(h.tags, tags) +} + +func TestRecordQueryDuration(t *testing.T) { + spy := &histogramSpy{} + + RecordQueryDuration(spy, "source", "row_count", 1500*time.Millisecond, nil) + RecordQueryDuration(spy, "target", "binlog_apply", 2*time.Second, errors.New("boom")) + + if len(spy.names) != 2 { + t.Fatalf("got %d histograms, want 2", len(spy.names)) + } + if spy.names[0] != "query.duration_milliseconds" || spy.values[0] != 1500 { + t.Fatalf("got %s=%v, want query.duration_milliseconds=1500", spy.names[0], spy.values[0]) + } + if !slices.Equal(spy.tags[0], []string{"side:source", "kind:row_count", "outcome:ok"}) { + t.Fatalf("got tags %#v", spy.tags[0]) + } + if spy.values[1] != 2000 || !slices.Equal(spy.tags[1], []string{"side:target", "kind:binlog_apply", "outcome:error"}) { + t.Fatalf("got second metric value=%v tags=%#v", spy.values[1], spy.tags[1]) + } +} + +func TestRecordQueryDurationNilSafe(t *testing.T) { + RecordQueryDuration(nil, "source", "row_count", time.Second, nil) + RecordQueryDuration(&histogramSpy{}, "", "row_count", time.Second, nil) + RecordQueryDuration(&histogramSpy{}, "source", "", time.Second, nil) + RecordQueryDuration(&histogramSpy{}, "source", "row_count", -time.Second, nil) +}