Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ type MigrationContext struct {
AbortError error
abortMutex *sync.Mutex

Metrics metrics.MemStatsGaugeEmitter
Metrics metrics.Emitter

OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
Expand Down
9 changes: 8 additions & 1 deletion go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func (mgtr *Migrator) initiateInspector() (err error) {
return nil
}

// emitProgressMetrics emits StatsD gauges from a progress snapshot.
// emitProgressMetrics emits StatsD gauges
func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) {
metrics.EmitProgressGauges(
mgtr.migrationContext.Metrics,
Expand All @@ -1181,6 +1181,13 @@ func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) {
snap.applyEventsBacklog,
snap.applyEventsCapacity,
)
isThrottled, _, _ := mgtr.migrationContext.IsThrottled()
metrics.EmitLagGauges(
mgtr.migrationContext.Metrics,
snap.replicationLagSeconds,
snap.heartbeatLagSeconds,
isThrottled,
)
}

// reportStatus samples progress, emits metrics, and optionally prints status output.
Expand Down
47 changes: 38 additions & 9 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,19 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) {
type progressGaugeSpy struct {
names []string
values []float64
tags [][]string
}

func (s *progressGaugeSpy) Gauge(name string, value float64, _ ...string) {
func (s *progressGaugeSpy) Gauge(name string, value float64, tags ...string) {
s.names = append(s.names, name)
s.values = append(s.values, value)
s.tags = append(s.tags, append([]string(nil), tags...))
}

func (s *progressGaugeSpy) Count(name string, value int64, tags ...string) {
}

func (s *progressGaugeSpy) Histogram(name string, value float64, tags ...string) {
}

func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) {
Expand All @@ -427,24 +435,26 @@ func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) {

migrator := NewMigrator(ctx, "test")
migrator.reportStatus(NoPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, []string{
"row_copy.rows_copied",
"row_copy.rows_estimate",
"dml.events_applied",
"binlog.backlog_size",
"binlog.backlog_capacity",
"binlog.backlog_utilization",
"lag.replication_seconds",
"lag.heartbeat_seconds",
}, spy.names)
assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01)

spy.names = nil
spy.values = nil
atomic.StoreInt64(&ctx.TotalDMLEventsApplied, 100)
migrator.reportStatus(NoPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
require.Len(t, spy.names, 8)
assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
}

func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) {
Expand All @@ -463,7 +473,7 @@ func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) {
migrator.reportStatus(NoPrintStatusRule, io.Discard)

capacity := float64(cap(migrator.applyEventsQueue))
require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, float64(2), spy.values[3])
assert.Equal(t, capacity, spy.values[4])
assert.InDelta(t, 2/capacity, spy.values[5], 1e-9)
Expand All @@ -480,7 +490,7 @@ func TestReportStatusEmitsGaugesWhenRowCopyComplete(t *testing.T) {
atomic.StoreInt64(&migrator.rowCopyCompleteFlag, 1)
migrator.reportStatus(NoPrintStatusRule, io.Discard)

require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, float64(5000), spy.values[0])
assert.Equal(t, float64(5000), spy.values[1], "rows_estimate tracks rows_copied when row copy is complete")
}
Expand All @@ -500,8 +510,27 @@ func TestReportStatusEmitsGaugesWhenPrintSuppressed(t *testing.T) {
require.False(t, migrator.shouldPrintStatus(HeuristicPrintStatusRule, snap.elapsedSeconds, etaDuration))

migrator.reportStatus(HeuristicPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
require.Len(t, spy.names, 8)
assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
}

func TestReportStatusEmitsLagGaugesWhenThrottled(t *testing.T) {
spy := &progressGaugeSpy{}
ctx := base.NewMigrationContext()
ctx.Metrics = spy
ctx.SetThrottled(true, "max-lag-millis", base.NoThrottleReasonHint)
atomic.StoreInt64(&ctx.CurrentLag, int64(5*time.Second))
ctx.SetLastHeartbeatOnChangelogTime(time.Now().Add(-4 * time.Second))

migrator := NewMigrator(ctx, "test")
migrator.reportStatus(NoPrintStatusRule, io.Discard)

require.GreaterOrEqual(t, len(spy.names), 8)
assert.Equal(t, "lag.replication_seconds", spy.names[6])
assert.Equal(t, "lag.heartbeat_seconds", spy.names[7])
require.Len(t, spy.tags[6], 1)
assert.Equal(t, "throttled:true", spy.tags[6][0])
assert.Equal(t, "throttled:true", spy.tags[7][0])
}

func TestMigratorShouldPrintStatus(t *testing.T) {
Expand Down
37 changes: 36 additions & 1 deletion go/logic/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/metrics"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
)
Expand All @@ -38,7 +39,10 @@ var (
}
)

const frenoMagicHint = "freno"
const (
frenoMagicHint = "freno"
throttleActiveMetricInterval = time.Second
)

// Throttler collects metrics related to throttling and makes informed decision
// whether throttling should take place.
Expand All @@ -50,6 +54,10 @@ type Throttler struct {
httpClientTimeout time.Duration
inspector *Inspector
finishedMigrating int64

throttleStartedAt time.Time
throttleStartedReason string
throttleActiveEmitted time.Time
}

func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector, appVersion string) *Throttler {
Expand Down Expand Up @@ -463,6 +471,32 @@ func (thlr *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan
}()
}

func (thlr *Throttler) recordThrottleMetrics(alreadyThrottling bool, shouldThrottle bool, throttleReason string, now time.Time) {
throttleStateChanged := shouldThrottle != alreadyThrottling
shouldEmitActiveGauge := throttleStateChanged ||
thlr.throttleActiveEmitted.IsZero() ||
now.Sub(thlr.throttleActiveEmitted) >= throttleActiveMetricInterval
if shouldEmitActiveGauge {
metrics.EmitThrottleActiveGauge(thlr.migrationContext.Metrics, shouldThrottle)
thlr.throttleActiveEmitted = now
}

if shouldThrottle && !alreadyThrottling {
thlr.throttleStartedAt = now
thlr.throttleStartedReason = throttleReason
return
}
if alreadyThrottling && !shouldThrottle && !thlr.throttleStartedAt.IsZero() {
metrics.EmitThrottleInterval(
thlr.migrationContext.Metrics,
now.Sub(thlr.throttleStartedAt),
thlr.throttleStartedReason,
)
thlr.throttleStartedAt = time.Time{}
thlr.throttleStartedReason = ""
}
}

// initiateThrottlerChecks initiates the throttle ticker and sets the basic behavior of throttling.
func (thlr *Throttler) initiateThrottlerChecks() {
throttlerFunction := func() {
Expand All @@ -478,6 +512,7 @@ func (thlr *Throttler) initiateThrottlerChecks() {
// End of throttling
thlr.applier.WriteAndLogChangelog("throttle", "done throttling")
}
thlr.recordThrottleMetrics(alreadyThrottling, shouldThrottle, throttleReason, time.Now())
thlr.migrationContext.SetThrottled(shouldThrottle, throttleReason, throttleReasonHint)
}
throttlerFunction()
Expand Down
51 changes: 51 additions & 0 deletions go/logic/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/github/gh-ost/go/base"
)
Expand Down Expand Up @@ -108,3 +109,53 @@ func TestThrottleCallsOnThrottledCallback(t *testing.T) {
t.Fatal("throttle() did not return after context cancellation")
}
}

type throttleMetricsSpy struct {
gaugeNames []string
gaugeValues []float64
histogramNames []string
histogramValues []float64
countNames []string
countValues []int64
tags [][]string
}

func (s *throttleMetricsSpy) Gauge(name string, value float64, tags ...string) {
s.gaugeNames = append(s.gaugeNames, name)
s.gaugeValues = append(s.gaugeValues, value)
s.tags = append(s.tags, append([]string(nil), tags...))
}

func (s *throttleMetricsSpy) Count(name string, value int64, tags ...string) {
s.countNames = append(s.countNames, name)
s.countValues = append(s.countValues, value)
s.tags = append(s.tags, append([]string(nil), tags...))
}

func (s *throttleMetricsSpy) Histogram(name string, value float64, tags ...string) {
s.histogramNames = append(s.histogramNames, name)
s.histogramValues = append(s.histogramValues, value)
s.tags = append(s.tags, append([]string(nil), tags...))
}

func TestRecordThrottleMetricsEmitsOneIntervalMetricOnThrottleExit(t *testing.T) {
thlr := newTestThrottler()
spy := &throttleMetricsSpy{}
thlr.migrationContext.Metrics = spy
startedAt := time.Unix(100, 0)

thlr.recordThrottleMetrics(false, true, "commanded by user", startedAt)
thlr.recordThrottleMetrics(true, true, "commanded by user", startedAt.Add(500*time.Millisecond))
thlr.recordThrottleMetrics(true, true, "commanded by user", startedAt.Add(1100*time.Millisecond))
thlr.recordThrottleMetrics(true, false, "", startedAt.Add(2500*time.Millisecond))

require.Equal(t, []string{"throttle.active", "throttle.active", "throttle.active"}, spy.gaugeNames)
require.Equal(t, []float64{1, 1, 0}, spy.gaugeValues)
require.Equal(t, []string{"throttle.duration_seconds"}, spy.histogramNames)
require.Equal(t, []float64{2.5}, spy.histogramValues)
require.Equal(t, []string{"throttle.events_total"}, spy.countNames)
require.Equal(t, []int64{1}, spy.countValues)
require.Len(t, spy.tags, 5)
assert.Equal(t, []string{"reason:commanded by user"}, spy.tags[3])
assert.Equal(t, []string{"reason:commanded by user"}, spy.tags[4])
}
31 changes: 0 additions & 31 deletions go/metrics/binlog_backlog.go

This file was deleted.

53 changes: 0 additions & 53 deletions go/metrics/binlog_backlog_test.go

This file was deleted.

14 changes: 14 additions & 0 deletions go/metrics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ type Client struct {
sd *statsd.Client
}

// Emitter is implemented by *Client; used for tests without UDP.
type Emitter interface {
Gauge(name string, value float64, tags ...string)
Count(name string, value int64, tags ...string)
Histogram(name string, value float64, tags ...string)
}

// NewClient connects to addr for StatsD. If addr is empty, returns Noop and nil error.
// namespace is typically "gh_ost." (metrics are named namespace + short name, e.g. gh_ost.startup).
// tags are global tags applied to every metric (repeatable --statsd-tags).
Expand Down Expand Up @@ -60,6 +67,13 @@ func (c *Client) Count(name string, value int64, tags ...string) {
_ = c.sd.Count(name, value, tags, 1.0)
}

func (c *Client) Histogram(name string, value float64, tags ...string) {
if c.sd == nil {
return
}
_ = c.sd.Histogram(name, value, tags, 1.0)
}

// Close flushes buffered metrics; safe for Noop.
func (c *Client) Close() error {
if c.sd == nil {
Expand Down
Loading
Loading