Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func (mgtr *Migrator) initiateInspector() (err error) {
return nil
}

// emitProgressMetrics emits StatsD gauges from a progress snapshot.
// emitProgressMetrics emits StatsD gauges
func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) {
metrics.EmitProgressGauges(
mgtr.migrationContext.Metrics,
Expand All @@ -1181,6 +1181,13 @@ func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) {
snap.applyEventsBacklog,
snap.applyEventsCapacity,
)
isThrottled, _, _ := mgtr.migrationContext.IsThrottled()
metrics.EmitLagGauges(
mgtr.migrationContext.Metrics,
snap.replicationLagSeconds,
snap.heartbeatLagSeconds,
isThrottled,
)
}

// reportStatus samples progress, emits metrics, and optionally prints status output.
Expand Down
41 changes: 32 additions & 9 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,13 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) {
type progressGaugeSpy struct {
names []string
values []float64
tags [][]string
}

func (s *progressGaugeSpy) Gauge(name string, value float64, _ ...string) {
func (s *progressGaugeSpy) Gauge(name string, value float64, tags ...string) {
s.names = append(s.names, name)
s.values = append(s.values, value)
s.tags = append(s.tags, append([]string(nil), tags...))
}

func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) {
Expand All @@ -427,24 +429,26 @@ func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) {

migrator := NewMigrator(ctx, "test")
migrator.reportStatus(NoPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, []string{
"row_copy.rows_copied",
"row_copy.rows_estimate",
"dml.events_applied",
"binlog.backlog_size",
"binlog.backlog_capacity",
"binlog.backlog_utilization",
"lag.replication_seconds",
"lag.heartbeat_seconds",
}, spy.names)
assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01)

spy.names = nil
spy.values = nil
atomic.StoreInt64(&ctx.TotalDMLEventsApplied, 100)
migrator.reportStatus(NoPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
require.Len(t, spy.names, 8)
assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
}

func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) {
Expand All @@ -463,7 +467,7 @@ func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) {
migrator.reportStatus(NoPrintStatusRule, io.Discard)

capacity := float64(cap(migrator.applyEventsQueue))
require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, float64(2), spy.values[3])
assert.Equal(t, capacity, spy.values[4])
assert.InDelta(t, 2/capacity, spy.values[5], 1e-9)
Expand All @@ -480,7 +484,7 @@ func TestReportStatusEmitsGaugesWhenRowCopyComplete(t *testing.T) {
atomic.StoreInt64(&migrator.rowCopyCompleteFlag, 1)
migrator.reportStatus(NoPrintStatusRule, io.Discard)

require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, float64(5000), spy.values[0])
assert.Equal(t, float64(5000), spy.values[1], "rows_estimate tracks rows_copied when row copy is complete")
}
Expand All @@ -500,8 +504,27 @@ func TestReportStatusEmitsGaugesWhenPrintSuppressed(t *testing.T) {
require.False(t, migrator.shouldPrintStatus(HeuristicPrintStatusRule, snap.elapsedSeconds, etaDuration))

migrator.reportStatus(HeuristicPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
require.Len(t, spy.names, 8)
assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
}

func TestReportStatusEmitsLagGaugesWhenThrottled(t *testing.T) {
spy := &progressGaugeSpy{}
ctx := base.NewMigrationContext()
ctx.Metrics = spy
ctx.SetThrottled(true, "max-lag-millis", base.NoThrottleReasonHint)
atomic.StoreInt64(&ctx.CurrentLag, int64(5*time.Second))
ctx.SetLastHeartbeatOnChangelogTime(time.Now().Add(-4 * time.Second))

migrator := NewMigrator(ctx, "test")
migrator.reportStatus(NoPrintStatusRule, io.Discard)

require.GreaterOrEqual(t, len(spy.names), 8)
assert.Equal(t, "lag.replication_seconds", spy.names[6])
assert.Equal(t, "lag.heartbeat_seconds", spy.names[7])
require.Len(t, spy.tags[6], 1)
assert.Equal(t, "throttled:true", spy.tags[6][0])
assert.Equal(t, "throttled:true", spy.tags[7][0])
}

func TestMigratorShouldPrintStatus(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions go/metrics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ type Client struct {
sd *statsd.Client
}

// MemStatsGaugeEmitter is implemented by *Client; used for tests without UDP.
type MemStatsGaugeEmitter interface {
Gauge(name string, value float64, tags ...string)
}

// NewClient connects to addr for StatsD. If addr is empty, returns Noop and nil error.
// namespace is typically "gh_ost." (metrics are named namespace + short name, e.g. gh_ost.startup).
// tags are global tags applied to every metric (repeatable --statsd-tags).
Expand Down
5 changes: 0 additions & 5 deletions go/metrics/go_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ import (
"time"
)

// MemStatsGaugeEmitter is implemented by *Client; used for tests without UDP.
type MemStatsGaugeEmitter interface {
Gauge(name string, value float64, tags ...string)
}

// EmitGoRuntimeGauges emits gh_ost.go_runtime.* gauges (namespace is applied by the client).
// m and numGoroutine are typically from runtime.ReadMemStats and runtime.NumGoroutine.
func EmitGoRuntimeGauges(emit MemStatsGaugeEmitter, m *runtime.MemStats, numGoroutine int) {
Expand Down
4 changes: 3 additions & 1 deletion go/metrics/go_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import (
type gaugeSpy struct {
names []string
values []float64
tags [][]string
}

func (g *gaugeSpy) Gauge(name string, value float64, _ ...string) {
func (g *gaugeSpy) Gauge(name string, value float64, tags ...string) {
g.names = append(g.names, name)
g.values = append(g.values, value)
g.tags = append(g.tags, append([]string(nil), tags...))
}

func TestEmitGoRuntimeGauges(t *testing.T) {
Expand Down
23 changes: 23 additions & 0 deletions go/metrics/lag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2022 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

package metrics

import "fmt"

// EmitLagGauges emits replication and heartbeat lag gauges (namespace is applied by the client):
// gh_ost.lag.replication_seconds, gh_ost.lag.heartbeat_seconds, each tagged throttled:true|false.
//
// These are point-in-time readings each status tick (not a distribution), so gauges are used
// rather than histograms — DogStatsD histogram aggregation exposes count/max series that do not
// match the log line lag values in Prometheus/Grafana.
func EmitLagGauges(emit MemStatsGaugeEmitter, replicationLagSeconds, heartbeatLagSeconds float64, throttled bool) {
if emit == nil {
return
}
tags := []string{fmt.Sprintf("throttled:%t", throttled)}
emit.Gauge("lag.replication_seconds", replicationLagSeconds, tags...)
emit.Gauge("lag.heartbeat_seconds", heartbeatLagSeconds, tags...)
}
47 changes: 47 additions & 0 deletions go/metrics/lag_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright 2022 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

package metrics

import "testing"

func TestEmitLagGauges_notThrottled(t *testing.T) {
spy := &gaugeSpy{}
EmitLagGauges(spy, 2.5, 1.25, false)

wantNames := []string{"lag.replication_seconds", "lag.heartbeat_seconds"}
wantVals := []float64{2.5, 1.25}
wantTags := []string{"throttled:false"}

if len(spy.names) != len(wantNames) {
t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames))
}
for i := range wantNames {
if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] {
t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i])
}
if len(spy.tags[i]) != 1 || spy.tags[i][0] != wantTags[0] {
t.Fatalf("[%d] got tags %v want [%s]", i, spy.tags[i], wantTags[0])
}
}
}

func TestEmitLagGauges_throttled(t *testing.T) {
spy := &gaugeSpy{}
EmitLagGauges(spy, 4.0, 3.0, true)

if len(spy.names) != 2 {
t.Fatalf("got %d gauges, want 2", len(spy.names))
}
for i := range spy.names {
if len(spy.tags[i]) != 1 || spy.tags[i][0] != "throttled:true" {
t.Fatalf("[%d] got tags %v want [throttled:true]", i, spy.tags[i])
}
}
}

func TestEmitLagGauges_nilSafe(t *testing.T) {
EmitLagGauges(nil, 1, 2, false)
}
Loading