Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions go/metrics/binlog_backlog.go

This file was deleted.

53 changes: 0 additions & 53 deletions go/metrics/binlog_backlog_test.go

This file was deleted.

108 changes: 108 additions & 0 deletions go/metrics/emit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
Copyright 2026 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

package metrics

import (
"context"
"fmt"
"runtime"
"time"
)

// EmitProgressGauges emits row-copy and DML progress gauges (namespace is applied by the client):
// gh_ost.row_copy.rows_copied, gh_ost.row_copy.rows_estimate, gh_ost.dml.events_applied.
func EmitProgressGauges(emit Emitter, rowsCopied, rowsEstimate, dmlEventsApplied int64) {
if emit == nil {
return
}
emit.Gauge("row_copy.rows_copied", float64(rowsCopied))
emit.Gauge("row_copy.rows_estimate", float64(rowsEstimate))
emit.Gauge("dml.events_applied", float64(dmlEventsApplied))
}

// EmitBinlogBacklogGauges emits apply-events queue depth gauges (namespace is applied by the client):
// gh_ost.binlog.backlog_size, gh_ost.binlog.backlog_capacity, gh_ost.binlog.backlog_utilization.
func EmitBinlogBacklogGauges(emit Emitter, backlogSize, backlogCapacity int) {
if emit == nil {
return
}
emit.Gauge("binlog.backlog_size", float64(backlogSize))
emit.Gauge("binlog.backlog_capacity", float64(backlogCapacity))
emit.Gauge("binlog.backlog_utilization", binlogBacklogUtilization(backlogSize, backlogCapacity))
}

func binlogBacklogUtilization(backlogSize, backlogCapacity int) float64 {
if backlogCapacity <= 0 {
return 0
}
utilization := float64(backlogSize) / float64(backlogCapacity)
if utilization > 1 {
return 1
}
if utilization < 0 {
return 0
}
return utilization
}

// EmitLagGauges emits replication and heartbeat lag gauges (namespace is applied by the client):
// gh_ost.lag.replication_seconds, gh_ost.lag.heartbeat_seconds, each tagged throttled:true|false.
//
// These are point-in-time readings each status tick (not a distribution), so gauges are used
// rather than histograms; DogStatsD histogram aggregation exposes count/max series that do not
// match the log line lag values in Prometheus/Grafana.
func EmitLagGauges(emit Emitter, replicationLagSeconds, heartbeatLagSeconds float64, throttled bool) {
if emit == nil {
return
}
tags := []string{fmt.Sprintf("throttled:%t", throttled)}
emit.Gauge("lag.replication_seconds", replicationLagSeconds, tags...)
emit.Gauge("lag.heartbeat_seconds", heartbeatLagSeconds, tags...)
}

// EmitGoRuntimeGauges emits gh_ost.go_runtime.* gauges (namespace is applied by the client).
// m and numGoroutine are typically from runtime.ReadMemStats and runtime.NumGoroutine.
func EmitGoRuntimeGauges(emit Emitter, m *runtime.MemStats, numGoroutine int) {
if emit == nil || m == nil {
return
}
emit.Gauge("go_runtime.alloc_bytes", float64(m.Alloc))
emit.Gauge("go_runtime.sys_bytes", float64(m.Sys))
emit.Gauge("go_runtime.heap_inuse_bytes", float64(m.HeapInuse))
emit.Gauge("go_runtime.num_gc", float64(m.NumGC))
emit.Gauge("go_runtime.gc_pause_total_ns", float64(m.PauseTotalNs))
emit.Gauge("go_runtime.goroutines", float64(numGoroutine))
}

// StartGoRuntimeReporter periodically samples runtime memory and goroutines and emits gauges
// until ctx is cancelled. It is a no-op when interval <= 0, client is nil, or StatsD is disabled
// (noop client).
func StartGoRuntimeReporter(ctx context.Context, client *Client, interval time.Duration) {
if ctx == nil || client == nil || interval <= 0 || client.sd == nil {
return
}

emit := func() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
EmitGoRuntimeGauges(client, &m, runtime.NumGoroutine())
}

go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()

emit()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
emit()
}
}
}()
}
184 changes: 184 additions & 0 deletions go/metrics/emit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
Copyright 2026 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

package metrics

import (
"context"
"runtime"
"testing"
"time"
)

type gaugeSpy struct {
names []string
values []float64
tags [][]string
}

func (g *gaugeSpy) Gauge(name string, value float64, tags ...string) {
g.names = append(g.names, name)
g.values = append(g.values, value)
g.tags = append(g.tags, append([]string(nil), tags...))
}

func (g *gaugeSpy) Count(name string, value int64, tags ...string) {
}

func (g *gaugeSpy) Histogram(name string, value float64, tags ...string) {
}

func TestEmitProgressGauges(t *testing.T) {
spy := &gaugeSpy{}
EmitProgressGauges(spy, 1000, 5000, 42)

wantNames := []string{
"row_copy.rows_copied",
"row_copy.rows_estimate",
"dml.events_applied",
}
wantVals := []float64{1000, 5000, 42}

if len(spy.names) != len(wantNames) {
t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames))
}
for i := range wantNames {
if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] {
t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i])
}
}
}

func TestEmitProgressGauges_nilSafe(t *testing.T) {
EmitProgressGauges(nil, 1, 2, 3)
}

func TestEmitBinlogBacklogGauges(t *testing.T) {
spy := &gaugeSpy{}
EmitBinlogBacklogGauges(spy, 250, 1000)

wantNames := []string{
"binlog.backlog_size",
"binlog.backlog_capacity",
"binlog.backlog_utilization",
}
wantVals := []float64{250, 1000, 0.25}

if len(spy.names) != len(wantNames) {
t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames))
}
for i := range wantNames {
if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] {
t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i])
}
}
}

func TestEmitBinlogBacklogGauges_nilSafe(t *testing.T) {
EmitBinlogBacklogGauges(nil, 1, 2)
}

func TestBinlogBacklogUtilization(t *testing.T) {
tests := []struct {
size, capacity int
want float64
}{
{0, 1000, 0},
{250, 1000, 0.25},
{1000, 1000, 1},
{1500, 1000, 1},
{-1, 1000, 0},
{10, 0, 0},
}
for _, tt := range tests {
got := binlogBacklogUtilization(tt.size, tt.capacity)
if got != tt.want {
t.Fatalf("utilization(%d, %d) = %v, want %v", tt.size, tt.capacity, got, tt.want)
}
}
}

func TestEmitLagGauges_notThrottled(t *testing.T) {
spy := &gaugeSpy{}
EmitLagGauges(spy, 2.5, 1.25, false)

wantNames := []string{"lag.replication_seconds", "lag.heartbeat_seconds"}
wantVals := []float64{2.5, 1.25}
wantTags := []string{"throttled:false"}

if len(spy.names) != len(wantNames) {
t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames))
}
for i := range wantNames {
if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] {
t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i])
}
if len(spy.tags[i]) != 1 || spy.tags[i][0] != wantTags[0] {
t.Fatalf("[%d] got tags %v want [%s]", i, spy.tags[i], wantTags[0])
}
}
}

func TestEmitLagGauges_throttled(t *testing.T) {
spy := &gaugeSpy{}
EmitLagGauges(spy, 4.0, 3.0, true)

if len(spy.names) != 2 {
t.Fatalf("got %d gauges, want 2", len(spy.names))
}
for i := range spy.names {
if len(spy.tags[i]) != 1 || spy.tags[i][0] != "throttled:true" {
t.Fatalf("[%d] got tags %v want [throttled:true]", i, spy.tags[i])
}
}
}

func TestEmitLagGauges_nilSafe(t *testing.T) {
EmitLagGauges(nil, 1, 2, false)
}

func TestEmitGoRuntimeGauges(t *testing.T) {
spy := &gaugeSpy{}
m := &runtime.MemStats{
Alloc: 100,
Sys: 200,
HeapInuse: 300,
NumGC: 7,
PauseTotalNs: 42,
}
EmitGoRuntimeGauges(spy, m, 123)

wantNames := []string{
"go_runtime.alloc_bytes",
"go_runtime.sys_bytes",
"go_runtime.heap_inuse_bytes",
"go_runtime.num_gc",
"go_runtime.gc_pause_total_ns",
"go_runtime.goroutines",
}
wantVals := []float64{100, 200, 300, 7, 42, 123}

if len(spy.names) != len(wantNames) {
t.Fatalf("got %d gauges, want %d", len(spy.names), len(wantNames))
}
for i := range wantNames {
if spy.names[i] != wantNames[i] || spy.values[i] != wantVals[i] {
t.Fatalf("[%d] got %s=%v want %s=%v", i, spy.names[i], spy.values[i], wantNames[i], wantVals[i])
}
}
}

func TestEmitGoRuntimeGauges_nilSafe(t *testing.T) {
EmitGoRuntimeGauges(nil, &runtime.MemStats{}, 1)
EmitGoRuntimeGauges(&gaugeSpy{}, nil, 1)
}

func TestStartGoRuntimeReporter_stopsOnCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
c := &Client{} // sd nil, so the reporter should not start.
StartGoRuntimeReporter(ctx, c, time.Millisecond)
cancel()
time.Sleep(20 * time.Millisecond)
}
Loading
Loading