From fdce897c2b54e64075cef985bfb20d61e400672d Mon Sep 17 00:00:00 2001 From: Patrick Begley Date: Wed, 27 May 2026 10:19:16 -0400 Subject: [PATCH 1/2] Emit binlog backlog metrics --- go/logic/migrator.go | 5 +++ go/logic/migrator_test.go | 39 +++++++++++++++++++---- go/metrics/binlog_backlog.go | 31 ++++++++++++++++++ go/metrics/binlog_backlog_test.go | 53 +++++++++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 7 deletions(-) create mode 100644 go/metrics/binlog_backlog.go create mode 100644 go/metrics/binlog_backlog_test.go diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 584b5ba1c..5ed2e2d9a 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1176,6 +1176,11 @@ func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) { snap.rowsEstimate, snap.dmlApplied, ) + metrics.EmitBinlogBacklogGauges( + mgtr.migrationContext.Metrics, + snap.applyEventsBacklog, + snap.applyEventsCapacity, + ) } // reportStatus samples progress, emits metrics, and optionally prints status output. diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 1a42b8459..31e161b5e 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -427,21 +427,46 @@ func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) { migrator := NewMigrator(ctx, "test") migrator.reportStatus(NoPrintStatusRule, io.Discard) - require.Len(t, spy.names, 3) + require.Len(t, spy.names, 6) assert.Equal(t, []string{ "row_copy.rows_copied", "row_copy.rows_estimate", "dml.events_applied", + "binlog.backlog_size", + "binlog.backlog_capacity", + "binlog.backlog_utilization", }, spy.names) - assert.Equal(t, []float64{1000, 5000, 42}, spy.values) + assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values) assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01) spy.names = nil spy.values = nil atomic.StoreInt64(&ctx.TotalDMLEventsApplied, 100) migrator.reportStatus(NoPrintStatusRule, io.Discard) - require.Len(t, spy.names, 3) - assert.Equal(t, []float64{1000, 5000, 100}, spy.values) + require.Len(t, spy.names, 6) + assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values) +} + +func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) { + spy := &progressGaugeSpy{} + ctx := base.NewMigrationContext() + ctx.Metrics = spy + + migrator := NewMigrator(ctx, "test") + migrator.applyEventsQueue <- newApplyEventStructByDML(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{DML: binlog.InsertDML}, + }) + migrator.applyEventsQueue <- newApplyEventStructByDML(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{DML: binlog.InsertDML}, + }) + + migrator.reportStatus(NoPrintStatusRule, io.Discard) + + capacity := float64(cap(migrator.applyEventsQueue)) + require.Len(t, spy.names, 6) + assert.Equal(t, float64(2), spy.values[3]) + assert.Equal(t, capacity, spy.values[4]) + assert.InDelta(t, 2/capacity, spy.values[5], 1e-9) } func TestReportStatusEmitsGaugesWhenRowCopyComplete(t *testing.T) { @@ -455,7 +480,7 @@ func TestReportStatusEmitsGaugesWhenRowCopyComplete(t *testing.T) { atomic.StoreInt64(&migrator.rowCopyCompleteFlag, 1) migrator.reportStatus(NoPrintStatusRule, io.Discard) - require.Len(t, spy.names, 3) + require.Len(t, spy.names, 6) assert.Equal(t, float64(5000), spy.values[0]) assert.Equal(t, float64(5000), spy.values[1], "rows_estimate tracks rows_copied when row copy is complete") } @@ -475,8 +500,8 @@ func TestReportStatusEmitsGaugesWhenPrintSuppressed(t *testing.T) { require.False(t, migrator.shouldPrintStatus(HeuristicPrintStatusRule, snap.elapsedSeconds, etaDuration)) migrator.reportStatus(HeuristicPrintStatusRule, io.Discard) - require.Len(t, spy.names, 3) - assert.Equal(t, []float64{1000, 5000, 0}, spy.values) + require.Len(t, spy.names, 6) + assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values) } func TestMigratorShouldPrintStatus(t *testing.T) { diff --git a/go/metrics/binlog_backlog.go b/go/metrics/binlog_backlog.go new file mode 100644 index 000000000..dda643c92 --- /dev/null +++ b/go/metrics/binlog_backlog.go @@ -0,0 +1,31 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package metrics + +// EmitBinlogBacklogGauges emits apply-events queue depth gauges (namespace is applied by the client): +// gh_ost.binlog.backlog_size, gh_ost.binlog.backlog_capacity, gh_ost.binlog.backlog_utilization. +func EmitBinlogBacklogGauges(emit MemStatsGaugeEmitter, backlogSize, backlogCapacity int) { + if emit == nil { + return + } + emit.Gauge("binlog.backlog_size", float64(backlogSize)) + emit.Gauge("binlog.backlog_capacity", float64(backlogCapacity)) + emit.Gauge("binlog.backlog_utilization", binlogBacklogUtilization(backlogSize, backlogCapacity)) +} + +func binlogBacklogUtilization(backlogSize, backlogCapacity int) float64 { + if backlogCapacity <= 0 { + return 0 + } + utilization := float64(backlogSize) / float64(backlogCapacity) + if utilization > 1 { + return 1 + } + if utilization < 0 { + return 0 + } + return utilization +} diff --git a/go/metrics/binlog_backlog_test.go b/go/metrics/binlog_backlog_test.go new file mode 100644 index 000000000..d61e991a7 --- /dev/null +++ b/go/metrics/binlog_backlog_test.go @@ -0,0 +1,53 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package metrics + +import "testing" + +func TestEmitBinlogBacklogGauges(t *testing.T) { + spy := &gaugeSpy{} + EmitBinlogBacklogGauges(spy, 250, 1000) + + wantNames := []string{ + "binlog.backlog_size", + "binlog.backlog_capacity", + "binlog.backlog_utilization", + } + wantVals := []float64{250, 1000, 0.25} + + if len(spy.names) != len(wantNames) { + t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames)) + } + for i := range wantNames { + if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] { + t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i]) + } + } +} + +func TestEmitBinlogBacklogGauges_nilSafe(t *testing.T) { + EmitBinlogBacklogGauges(nil, 1, 2) +} + +func TestBinlogBacklogUtilization(t *testing.T) { + tests := []struct { + size, capacity int + want float64 + }{ + {0, 1000, 0}, + {250, 1000, 0.25}, + {1000, 1000, 1}, + {1500, 1000, 1}, + {-1, 1000, 0}, + {10, 0, 0}, + } + for _, tt := range tests { + got := binlogBacklogUtilization(tt.size, tt.capacity) + if got != tt.want { + t.Fatalf("utilization(%d, %d) = %v, want %v", tt.size, tt.capacity, got, tt.want) + } + } +} From 12534169a03f700a0fa3b5559ea3763d5d6c2a3c Mon Sep 17 00:00:00 2001 From: Patrick Begley Date: Mon, 1 Jun 2026 13:15:20 -0400 Subject: [PATCH 2/2] update copyright dates --- go/metrics/binlog_backlog.go | 2 +- go/metrics/binlog_backlog_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/metrics/binlog_backlog.go b/go/metrics/binlog_backlog.go index dda643c92..2d6ea5a4b 100644 --- a/go/metrics/binlog_backlog.go +++ b/go/metrics/binlog_backlog.go @@ -1,5 +1,5 @@ /* - Copyright 2022 GitHub Inc. + Copyright 2026 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ diff --git a/go/metrics/binlog_backlog_test.go b/go/metrics/binlog_backlog_test.go index d61e991a7..4f99dc1b1 100644 --- a/go/metrics/binlog_backlog_test.go +++ b/go/metrics/binlog_backlog_test.go @@ -1,5 +1,5 @@ /* - Copyright 2022 GitHub Inc. + Copyright 2026 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */