Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ type MigrationContext struct {
AbortError error
abortMutex *sync.Mutex

Metrics metrics.MemStatsGaugeEmitter
Metrics metrics.Emitter

OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
Expand Down
12 changes: 12 additions & 0 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/metrics"
"github.com/github/gh-ost/go/sql"

"context"
Expand Down Expand Up @@ -893,22 +894,27 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool
return hasFurtherRange, err
}

queryStartTime := time.Now()
rows, err := apl.db.Query(query, explodedArgs...)
if err != nil {
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err)
return hasFurtherRange, err
}
defer rows.Close()

iterationRangeMaxValues := sql.NewColumnValues(apl.migrationContext.UniqueKey.Len())
for rows.Next() {
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err)
return hasFurtherRange, err
}
hasFurtherRange = true
}
if err = rows.Err(); err != nil {
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), err)
return hasFurtherRange, err
}
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "source", "range_select", time.Since(queryStartTime), nil)
if hasFurtherRange {
apl.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
return hasFurtherRange, nil
Expand Down Expand Up @@ -956,7 +962,9 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i
if _, err := tx.Exec(sessionQuery); err != nil {
return nil, err
}
queryStartTime := time.Now()
result, err := tx.Exec(query, explodedArgs...)
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "chunk_copy", time.Since(queryStartTime), err)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1669,13 +1677,16 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e
// in the batch. SHOW WARNINGS only shows warnings from the last statement in a
// multi-statement query, so we interleave SHOW WARNINGS after each DML statement.
if apl.migrationContext.PanicOnWarnings {
queryStartTime := time.Now()
totalDelta, err = apl.executeBatchWithWarningChecking(ctx, tx, buildResults)
metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), err)
if err != nil {
return rollback(err)
}
} else {
// Fast path: batch together DML queries into multi-statements to minimize network trips.
// We use the raw driver connection to access the rows affected for each statement.
queryStartTime := time.Now()
execErr := conn.Raw(func(driverConn any) error {
ex := driverConn.(driver.ExecerContext)
nvc := driverConn.(driver.NamedValueChecker)
Expand Down Expand Up @@ -1709,6 +1720,7 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e
return nil
})

metrics.RecordQueryDuration(apl.migrationContext.Metrics, "target", "binlog_apply", time.Since(queryStartTime), execErr)
if execErr != nil {
return rollback(execErr)
}
Expand Down
4 changes: 4 additions & 0 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/metrics"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"

Expand Down Expand Up @@ -677,13 +678,16 @@ func (isp *Inspector) CountTableRows(ctx context.Context) error {

query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName))
var rowsEstimate int64
queryStartTime := time.Now()
if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil {
metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
isp.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err())
return mysql.Kill(isp.db, connectionID)
}
return err
}
metrics.RecordQueryDuration(isp.migrationContext.Metrics, "source", "row_count", time.Since(queryStartTime), nil)

// row count query finished. nil out the cancel func, so the main migration thread
// doesn't bother calling it after row copy is done.
Expand Down
9 changes: 8 additions & 1 deletion go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func (mgtr *Migrator) initiateInspector() (err error) {
return nil
}

// emitProgressMetrics emits StatsD gauges from a progress snapshot.
// emitProgressMetrics emits StatsD gauges
func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) {
metrics.EmitProgressGauges(
mgtr.migrationContext.Metrics,
Expand All @@ -1181,6 +1181,13 @@ func (mgtr *Migrator) emitProgressMetrics(snap migrationProgressSnapshot) {
snap.applyEventsBacklog,
snap.applyEventsCapacity,
)
isThrottled, _, _ := mgtr.migrationContext.IsThrottled()
metrics.EmitLagGauges(
mgtr.migrationContext.Metrics,
snap.replicationLagSeconds,
snap.heartbeatLagSeconds,
isThrottled,
)
}

// reportStatus samples progress, emits metrics, and optionally prints status output.
Expand Down
47 changes: 38 additions & 9 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,19 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) {
type progressGaugeSpy struct {
names []string
values []float64
tags [][]string
}

func (s *progressGaugeSpy) Gauge(name string, value float64, _ ...string) {
func (s *progressGaugeSpy) Gauge(name string, value float64, tags ...string) {
s.names = append(s.names, name)
s.values = append(s.values, value)
s.tags = append(s.tags, append([]string(nil), tags...))
}

func (s *progressGaugeSpy) Count(name string, value int64, tags ...string) {
}

func (s *progressGaugeSpy) Histogram(name string, value float64, tags ...string) {
}

func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) {
Expand All @@ -427,24 +435,26 @@ func TestReportStatusEmitsProgressGaugesEveryTick(t *testing.T) {

migrator := NewMigrator(ctx, "test")
migrator.reportStatus(NoPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, []string{
"row_copy.rows_copied",
"row_copy.rows_estimate",
"dml.events_applied",
"binlog.backlog_size",
"binlog.backlog_capacity",
"binlog.backlog_utilization",
"lag.replication_seconds",
"lag.heartbeat_seconds",
}, spy.names)
assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
assert.Equal(t, []float64{1000, 5000, 42, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
assert.InDelta(t, 20.0, ctx.GetProgressPct(), 0.01)

spy.names = nil
spy.values = nil
atomic.StoreInt64(&ctx.TotalDMLEventsApplied, 100)
migrator.reportStatus(NoPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
require.Len(t, spy.names, 8)
assert.Equal(t, []float64{1000, 5000, 100, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
}

func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) {
Expand All @@ -463,7 +473,7 @@ func TestReportStatusEmitsBinlogBacklogGauges(t *testing.T) {
migrator.reportStatus(NoPrintStatusRule, io.Discard)

capacity := float64(cap(migrator.applyEventsQueue))
require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, float64(2), spy.values[3])
assert.Equal(t, capacity, spy.values[4])
assert.InDelta(t, 2/capacity, spy.values[5], 1e-9)
Expand All @@ -480,7 +490,7 @@ func TestReportStatusEmitsGaugesWhenRowCopyComplete(t *testing.T) {
atomic.StoreInt64(&migrator.rowCopyCompleteFlag, 1)
migrator.reportStatus(NoPrintStatusRule, io.Discard)

require.Len(t, spy.names, 6)
require.Len(t, spy.names, 8)
assert.Equal(t, float64(5000), spy.values[0])
assert.Equal(t, float64(5000), spy.values[1], "rows_estimate tracks rows_copied when row copy is complete")
}
Expand All @@ -500,8 +510,27 @@ func TestReportStatusEmitsGaugesWhenPrintSuppressed(t *testing.T) {
require.False(t, migrator.shouldPrintStatus(HeuristicPrintStatusRule, snap.elapsedSeconds, etaDuration))

migrator.reportStatus(HeuristicPrintStatusRule, io.Discard)
require.Len(t, spy.names, 6)
assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values)
require.Len(t, spy.names, 8)
assert.Equal(t, []float64{1000, 5000, 0, 0, float64(cap(migrator.applyEventsQueue)), 0}, spy.values[:6])
}

func TestReportStatusEmitsLagGaugesWhenThrottled(t *testing.T) {
spy := &progressGaugeSpy{}
ctx := base.NewMigrationContext()
ctx.Metrics = spy
ctx.SetThrottled(true, "max-lag-millis", base.NoThrottleReasonHint)
atomic.StoreInt64(&ctx.CurrentLag, int64(5*time.Second))
ctx.SetLastHeartbeatOnChangelogTime(time.Now().Add(-4 * time.Second))

migrator := NewMigrator(ctx, "test")
migrator.reportStatus(NoPrintStatusRule, io.Discard)

require.GreaterOrEqual(t, len(spy.names), 8)
assert.Equal(t, "lag.replication_seconds", spy.names[6])
assert.Equal(t, "lag.heartbeat_seconds", spy.names[7])
require.Len(t, spy.tags[6], 1)
assert.Equal(t, "throttled:true", spy.tags[6][0])
assert.Equal(t, "throttled:true", spy.tags[7][0])
}

func TestMigratorShouldPrintStatus(t *testing.T) {
Expand Down
31 changes: 0 additions & 31 deletions go/metrics/binlog_backlog.go

This file was deleted.

53 changes: 0 additions & 53 deletions go/metrics/binlog_backlog_test.go

This file was deleted.

14 changes: 14 additions & 0 deletions go/metrics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ type Client struct {
sd *statsd.Client
}

// Emitter is implemented by *Client; used for tests without UDP.
type Emitter interface {
Gauge(name string, value float64, tags ...string)
Count(name string, value int64, tags ...string)
Histogram(name string, value float64, tags ...string)
}

// NewClient connects to addr for StatsD. If addr is empty, returns Noop and nil error.
// namespace is typically "gh_ost." (metrics are named namespace + short name, e.g. gh_ost.startup).
// tags are global tags applied to every metric (repeatable --statsd-tags).
Expand Down Expand Up @@ -60,6 +67,13 @@ func (c *Client) Count(name string, value int64, tags ...string) {
_ = c.sd.Count(name, value, tags, 1.0)
}

func (c *Client) Histogram(name string, value float64, tags ...string) {
if c.sd == nil {
return
}
_ = c.sd.Histogram(name, value, tags, 1.0)
}

// Close flushes buffered metrics; safe for Noop.
func (c *Client) Close() error {
if c.sd == nil {
Expand Down
Loading
Loading