From d8d7cbad9d104b0d23220ad11dc902a6854325d5 Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Wed, 4 Mar 2026 13:47:07 +0100 Subject: [PATCH 1/7] wip - benchmark and optimize gtid --- go/binlog/gomysql_reader.go | 35 ++++-- go/binlog/streaming_bench_test.go | 201 ++++++++++++++++++++++++++++++ go/mysql/binlog_gtid.go | 53 ++++++++ 3 files changed, 281 insertions(+), 8 deletions(-) create mode 100644 go/binlog/streaming_bench_test.go diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index d690a9f65..b10123190 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -31,6 +31,16 @@ type GoMySQLReader struct { // LastTrxCoords are the coordinates of the last transaction completely read. // If using the file coordinates it is binlog position of the transaction's XID event. LastTrxCoords mysql.BinlogCoordinates + // currentTrxCoords is set once per GTIDEvent and shared by all RowsEvents within + // the same transaction. It points to currentCoordinates (a *LazyGTIDCoordinates), + // which is replaced at the next GTIDEvent — so old entries retain valid references. + // Only accessed from within the StreamEvents goroutine; no mutex needed. + currentTrxCoords mysql.BinlogCoordinates + // lastCommittedGTIDSet is the MysqlGTIDSet from the most recently seen XIDEvent + // (or the initial coordinates). It is immutable once set and used as the base for + // LazyGTIDCoordinates so we avoid cloning the full set on each GTIDEvent. + // Only written from within StreamEvents; no mutex needed. + lastCommittedGTIDSet *gomysql.MysqlGTIDSet } func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader { @@ -68,6 +78,7 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin // Start sync with specified GTID set or binlog file and position if this.migrationContext.UseGTIDs { coords := coordinates.(*mysql.GTIDBinlogCoordinates) + this.lastCommittedGTIDSet = coords.GTIDSet this.binlogStreamer, err = this.binlogSyncer.StartSyncGTID(coords.GTIDSet) } else { coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates) @@ -86,7 +97,12 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates } func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error { - currentCoords := this.GetCurrentBinlogCoordinates() + var currentCoords mysql.BinlogCoordinates + if this.migrationContext.UseGTIDs && this.currentTrxCoords != nil { + currentCoords = this.currentTrxCoords + } else { + currentCoords = this.GetCurrentBinlogCoordinates() + } dml := ToEventDML(ev.Header.EventType.String()) if dml == NotDML { return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String()) @@ -167,12 +183,8 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha return err } this.currentCoordinatesMutex.Lock() - if this.LastTrxCoords != nil { - this.currentCoordinates = this.LastTrxCoords.Clone() - } - coords := this.currentCoordinates.(*mysql.GTIDBinlogCoordinates) - trxGset := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: event.GNO, Stop: event.GNO + 1}) - coords.GTIDSet.AddSet(trxGset) + this.currentCoordinates = mysql.NewLazyGTIDCoordinates(this.lastCommittedGTIDSet, sid, event.GNO) + this.currentTrxCoords = this.currentCoordinates this.currentCoordinatesMutex.Unlock() case *replication.RotateEvent: if this.migrationContext.UseGTIDs { @@ -185,7 +197,14 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha this.currentCoordinatesMutex.Unlock() case *replication.XIDEvent: if this.migrationContext.UseGTIDs { - this.LastTrxCoords = &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet.(*gomysql.MysqlGTIDSet)} + gSet := event.GSet.(*gomysql.MysqlGTIDSet) + if coords, ok := this.LastTrxCoords.(*mysql.GTIDBinlogCoordinates); ok { + coords.GTIDSet = gSet + coords.UUIDSet = nil + } else { + this.LastTrxCoords = &mysql.GTIDBinlogCoordinates{GTIDSet: gSet} + } + this.lastCommittedGTIDSet = gSet } else { this.LastTrxCoords = this.currentCoordinates.Clone() } diff --git a/go/binlog/streaming_bench_test.go b/go/binlog/streaming_bench_test.go new file mode 100644 index 000000000..3050062d3 --- /dev/null +++ b/go/binlog/streaming_bench_test.go @@ -0,0 +1,201 @@ +package binlog + +import ( + "fmt" + "io" + "os" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/mysql" + gomysql "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" + guuid "github.com/google/uuid" +) + +const ( + benchTxCount = 1_000 + benchRowsPerTx = 5 +) + +// 02b9e2cf-9c8a-11e7-a479-42010ae7009b — one of the real servers in the set +var benchServerSID = []byte{ + 0x02, 0xb9, 0xe2, 0xcf, 0x9c, 0x8a, 0x11, 0xe7, + 0xa4, 0x79, 0x42, 0x01, 0x0a, 0xe7, 0x00, 0x9b, +} + +func loadProductionGTIDSet(tb testing.TB) *gomysql.MysqlGTIDSet { + data, err := os.ReadFile("../../gtid_executed_shard21") + if err != nil { + tb.Fatalf("could not load gtid_executed_shard21: %v", err) + } + cleaned := strings.Join(strings.Fields(string(data)), ",") + set, err := gomysql.ParseMysqlGTIDSet(cleaned) + if err != nil { + tb.Fatalf("could not parse GTID set: %v", err) + } + return set.(*gomysql.MysqlGTIDSet) +} + +func buildGTIDEvents(initialSet *gomysql.MysqlGTIDSet) []*replication.BinlogEvent { + events := make([]*replication.BinlogEvent, 0, benchTxCount*(benchRowsPerTx+2)) + accSet := initialSet.Clone().(*gomysql.MysqlGTIDSet) + sid, _ := guuid.FromBytes(benchServerSID) + + for i := 0; i < benchTxCount; i++ { + gno := int64(73_590_714 + i) + + events = append(events, &replication.BinlogEvent{ + Header: &replication.EventHeader{EventType: replication.GTID_EVENT}, + Event: &replication.GTIDEvent{SID: benchServerSID, GNO: gno}, + }) + + for r := 0; r < benchRowsPerTx; r++ { + events = append(events, &replication.BinlogEvent{ + Header: &replication.EventHeader{ + EventType: replication.WRITE_ROWS_EVENTv2, + LogPos: uint32(i*1000 + r + 1), + EventSize: 100, + }, + Event: &replication.RowsEvent{ + Table: &replication.TableMapEvent{ + Schema: []byte("mydb"), + Table: []byte("orders"), + }, + Rows: [][]interface{}{{int64(i), "value"}}, + }, + }) + } + + trxGset := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: gno, Stop: gno + 1}) + accSet.AddSet(trxGset) + + events = append(events, &replication.BinlogEvent{ + Header: &replication.EventHeader{EventType: replication.XID_EVENT}, + Event: &replication.XIDEvent{GSet: accSet.Clone()}, + }) + } + return events +} + +func buildFileEvents() []*replication.BinlogEvent { + events := make([]*replication.BinlogEvent, 0, benchTxCount*(benchRowsPerTx+1)) + + for i := 0; i < benchTxCount; i++ { + for r := 0; r < benchRowsPerTx; r++ { + events = append(events, &replication.BinlogEvent{ + Header: &replication.EventHeader{ + EventType: replication.WRITE_ROWS_EVENTv2, + LogPos: uint32(i*1000 + r + 1), + EventSize: 100, + }, + Event: &replication.RowsEvent{ + Table: &replication.TableMapEvent{ + Schema: []byte("mydb"), + Table: []byte("orders"), + }, + Rows: [][]interface{}{{int64(i), "value"}}, + }, + }) + } + + events = append(events, &replication.BinlogEvent{ + Header: &replication.EventHeader{ + EventType: replication.XID_EVENT, + LogPos: uint32(i*1000 + benchRowsPerTx + 1), + }, + Event: &replication.XIDEvent{}, + }) + } + return events +} + +// feedAndRun feeds events into a fresh streamer concurrently with StreamEvents. +// This avoids b.N scaling issues caused by heavy pre-fill setup dominating over +// the (very fast) file-mode processing time. +func feedAndRun(b *testing.B, label string, useGTIDs bool, events []*replication.BinlogEvent, initialCoords mysql.BinlogCoordinates) { + b.ReportAllocs() + + var iterations atomic.Int64 + done := make(chan struct{}) + + go func() { + spinner := []string{"|", "/", "-", "\\"} + tick := time.NewTicker(500 * time.Millisecond) + defer tick.Stop() + frame := 0 + for { + select { + case <-done: + fmt.Fprintf(os.Stderr, "\r%-30s done (%d iters) \n", label, iterations.Load()) + return + case <-tick.C: + fmt.Fprintf(os.Stderr, "\r%-30s %s iter %d", label, spinner[frame%4], iterations.Load()) + frame++ + } + } + }() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Small channel — events flow through as StreamEvents consumes them. + s := replication.NewBinlogStreamer() + + ctx := &base.MigrationContext{} + ctx.UseGTIDs = useGTIDs + reader := &GoMySQLReader{ + migrationContext: ctx, + currentCoordinatesMutex: &sync.Mutex{}, + currentCoordinates: initialCoords.Clone(), + binlogStreamer: s, + } + entriesCh := make(chan *BinlogEntry, 100) + + // Feed events concurrently so AddEventToStreamer never blocks. + var feedDone sync.WaitGroup + feedDone.Add(1) + go func() { + defer feedDone.Done() + for _, ev := range events { + s.AddEventToStreamer(ev) + } + s.AddErrorToStreamer(io.EOF) + }() + + // Drain entries so StreamEvents never blocks writing to entriesCh. + var drainDone sync.WaitGroup + drainDone.Add(1) + go func() { + defer drainDone.Done() + for range entriesCh { + } + }() + + reader.StreamEvents(func() bool { return false }, entriesCh) + feedDone.Wait() + close(entriesCh) + drainDone.Wait() + + iterations.Add(1) + } + + close(done) +} + +func BenchmarkStreamingGTID(b *testing.B) { + initialSet := loadProductionGTIDSet(b) + events := buildGTIDEvents(initialSet) + initialCoords := &mysql.GTIDBinlogCoordinates{GTIDSet: initialSet} + feedAndRun(b, "GTID (182 UUIDs)", true, events, initialCoords) +} + +func BenchmarkStreamingFile(b *testing.B) { + events := buildFileEvents() + initialCoords := &mysql.FileBinlogCoordinates{LogFile: "mysql-bin.000001", LogPos: 0} + feedAndRun(b, "File", false, events, initialCoords) +} diff --git a/go/mysql/binlog_gtid.go b/go/mysql/binlog_gtid.go index d7b86c04f..e161132ad 100644 --- a/go/mysql/binlog_gtid.go +++ b/go/mysql/binlog_gtid.go @@ -6,7 +6,10 @@ package mysql import ( + "fmt" + gomysql "github.com/go-mysql-org/go-mysql/mysql" + uuid "github.com/google/uuid" ) // GTIDBinlogCoordinates describe binary log coordinates in MySQL GTID format. @@ -85,3 +88,53 @@ func (this *GTIDBinlogCoordinates) Clone() BinlogCoordinates { } return out } + +// LazyGTIDCoordinates describes the in-flight coordinates of a transaction that +// has been announced via GTIDEvent but not yet committed (XIDEvent not yet seen). +// It holds a stable, immutable reference to the last-committed MysqlGTIDSet and +// the current transaction's GTID. The expensive Clone of the full set is deferred +// until Materialize is actually called, which only happens when external callers +// need a snapshot (via GetCurrentBinlogCoordinates) or when a comparison is made — +// not on every row event in the hot path. +type LazyGTIDCoordinates struct { + base *gomysql.MysqlGTIDSet // last-committed GTIDSet; immutable, not owned + sid uuid.UUID // current transaction's server UUID + gno int64 // current transaction's GNO +} + +// NewLazyGTIDCoordinates creates coordinates for an in-flight transaction. +// base must be the MysqlGTIDSet of the last committed transaction and must +// not be mutated after this call. +func NewLazyGTIDCoordinates(base *gomysql.MysqlGTIDSet, sid uuid.UUID, gno int64) *LazyGTIDCoordinates { + return &LazyGTIDCoordinates{base: base, sid: sid, gno: gno} +} + +// Materialize clones the base set, adds the in-flight GTID, and returns a full +// GTIDBinlogCoordinates. The result is an independent snapshot safe to hold across +// transaction boundaries. This is the only point where a MysqlGTIDSet.Clone occurs. +func (l *LazyGTIDCoordinates) Materialize() *GTIDBinlogCoordinates { + set := l.base.Clone().(*gomysql.MysqlGTIDSet) + set.AddGTID(l.sid, l.gno) + return >IDBinlogCoordinates{GTIDSet: set} +} + +func (l *LazyGTIDCoordinates) String() string { return l.Materialize().String() } +func (l *LazyGTIDCoordinates) DisplayString() string { return fmt.Sprintf("%s:%d", l.sid, l.gno) } +func (l *LazyGTIDCoordinates) IsEmpty() bool { return l.base == nil } + +func (l *LazyGTIDCoordinates) Equals(other BinlogCoordinates) bool { + return l.Materialize().Equals(other) +} + +func (l *LazyGTIDCoordinates) SmallerThan(other BinlogCoordinates) bool { + return l.Materialize().SmallerThan(other) +} + +func (l *LazyGTIDCoordinates) SmallerThanOrEquals(other BinlogCoordinates) bool { + return l.Materialize().SmallerThanOrEquals(other) +} + +// Clone materializes the full coordinates. The returned *GTIDBinlogCoordinates is +// an independent copy; callers receive a concrete type regardless of which +// BinlogCoordinates implementation produced it. +func (l *LazyGTIDCoordinates) Clone() BinlogCoordinates { return l.Materialize() } From e3acc617ffcae4c414b3f7ea3be1ab3c9f5d7c21 Mon Sep 17 00:00:00 2001 From: Patrick Begley Date: Wed, 4 Mar 2026 12:07:55 -0500 Subject: [PATCH 2/7] change round 1 --- go/binlog/binlog_dml_event.go | 26 ++++++++++++++++++++++++++ go/binlog/gomysql_reader.go | 26 +++++++++++++++----------- go/mysql/binlog_gtid.go | 11 +++++++++++ 3 files changed, 52 insertions(+), 11 deletions(-) diff --git a/go/binlog/binlog_dml_event.go b/go/binlog/binlog_dml_event.go index 2c7aa365d..6f96c2a7b 100644 --- a/go/binlog/binlog_dml_event.go +++ b/go/binlog/binlog_dml_event.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/github/gh-ost/go/sql" + "github.com/go-mysql-org/go-mysql/replication" ) type EventDML string @@ -21,6 +22,31 @@ const ( DeleteDML EventDML = "Delete" ) +// ToEventDMLFromType converts a binlog EventType directly to EventDML. +// This is more efficient than ToEventDML() as it avoids string allocation and parsing. +func ToEventDMLFromType(eventType replication.EventType) EventDML { + switch eventType { + case replication.WRITE_ROWS_EVENTv0, + replication.WRITE_ROWS_EVENTv1, + replication.WRITE_ROWS_EVENTv2, + replication.MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: + return InsertDML + case replication.UPDATE_ROWS_EVENTv0, + replication.UPDATE_ROWS_EVENTv1, + replication.UPDATE_ROWS_EVENTv2, + replication.MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1, + replication.PARTIAL_UPDATE_ROWS_EVENT: + return UpdateDML + case replication.DELETE_ROWS_EVENTv0, + replication.DELETE_ROWS_EVENTv1, + replication.DELETE_ROWS_EVENTv2, + replication.MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: + return DeleteDML + default: + return NotDML + } +} + func ToEventDML(description string) EventDML { // description can be a statement (`UPDATE my_table ...`) or a RBR event name (`UpdateRowsEventV2`) description = strings.TrimSpace(strings.Split(description, " ")[0]) diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index b10123190..febbd9d36 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -91,8 +91,8 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin } func (this *GoMySQLReader) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates { - this.currentCoordinatesMutex.Lock() - defer this.currentCoordinatesMutex.Unlock() + //this.currentCoordinatesMutex.Lock() + //defer this.currentCoordinatesMutex.Unlock() return this.currentCoordinates.Clone() } @@ -103,10 +103,17 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven } else { currentCoords = this.GetCurrentBinlogCoordinates() } - dml := ToEventDML(ev.Header.EventType.String()) + + // Use direct EventType switch instead of string conversion + dml := ToEventDMLFromType(ev.Header.EventType) if dml == NotDML { - return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String()) + return fmt.Errorf("Unknown DML type: %v", ev.Header.EventType) } + + // Convert schema and table names once per RowsEvent, not per row + schemaName := string(rowsEvent.Table.Schema) + tableName := string(rowsEvent.Table.Table) + for i, row := range rowsEvent.Rows { if dml == UpdateDML && i%2 == 1 { // An update has two rows (WHERE+SET) @@ -114,11 +121,8 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven continue } binlogEntry := NewBinlogEntryAt(currentCoords) - binlogEntry.DmlEvent = NewBinlogDMLEvent( - string(rowsEvent.Table.Schema), - string(rowsEvent.Table.Table), - dml, - ) + binlogEntry.DmlEvent = NewBinlogDMLEvent(schemaName, tableName, dml) + switch dml { case InsertDML: { @@ -182,10 +186,10 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha if err != nil { return err } - this.currentCoordinatesMutex.Lock() + //this.currentCoordinatesMutex.Lock() this.currentCoordinates = mysql.NewLazyGTIDCoordinates(this.lastCommittedGTIDSet, sid, event.GNO) this.currentTrxCoords = this.currentCoordinates - this.currentCoordinatesMutex.Unlock() + //this.currentCoordinatesMutex.Unlock() case *replication.RotateEvent: if this.migrationContext.UseGTIDs { continue diff --git a/go/mysql/binlog_gtid.go b/go/mysql/binlog_gtid.go index e161132ad..0961addb9 100644 --- a/go/mysql/binlog_gtid.go +++ b/go/mysql/binlog_gtid.go @@ -7,6 +7,7 @@ package mysql import ( "fmt" + "sync" gomysql "github.com/go-mysql-org/go-mysql/mysql" uuid "github.com/google/uuid" @@ -100,6 +101,9 @@ type LazyGTIDCoordinates struct { base *gomysql.MysqlGTIDSet // last-committed GTIDSet; immutable, not owned sid uuid.UUID // current transaction's server UUID gno int64 // current transaction's GNO + + cacheMutex sync.Mutex + cachedMaterialized *GTIDBinlogCoordinates } // NewLazyGTIDCoordinates creates coordinates for an in-flight transaction. @@ -113,6 +117,13 @@ func NewLazyGTIDCoordinates(base *gomysql.MysqlGTIDSet, sid uuid.UUID, gno int64 // GTIDBinlogCoordinates. The result is an independent snapshot safe to hold across // transaction boundaries. This is the only point where a MysqlGTIDSet.Clone occurs. func (l *LazyGTIDCoordinates) Materialize() *GTIDBinlogCoordinates { + l.cacheMutex.Lock() + defer l.cacheMutex.Unlock() + + if l.cachedMaterialized != nil { + return l.cachedMaterialized + } + set := l.base.Clone().(*gomysql.MysqlGTIDSet) set.AddGTID(l.sid, l.gno) return >IDBinlogCoordinates{GTIDSet: set} From 5e3887aa99baabf24ecebb11734498677bde9ae7 Mon Sep 17 00:00:00 2001 From: Patrick Begley Date: Thu, 5 Mar 2026 10:48:44 -0500 Subject: [PATCH 3/7] actually cache materialized gtid --- go/mysql/binlog_gtid.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/mysql/binlog_gtid.go b/go/mysql/binlog_gtid.go index 0961addb9..4bbe4f9bd 100644 --- a/go/mysql/binlog_gtid.go +++ b/go/mysql/binlog_gtid.go @@ -126,7 +126,8 @@ func (l *LazyGTIDCoordinates) Materialize() *GTIDBinlogCoordinates { set := l.base.Clone().(*gomysql.MysqlGTIDSet) set.AddGTID(l.sid, l.gno) - return >IDBinlogCoordinates{GTIDSet: set} + l.cachedMaterialized = >IDBinlogCoordinates{GTIDSet: set} + return l.cachedMaterialized } func (l *LazyGTIDCoordinates) String() string { return l.Materialize().String() } From 5c2ac00995a9f1cd56b9f68438f6ce558880ce27 Mon Sep 17 00:00:00 2001 From: Patrick Begley Date: Thu, 5 Mar 2026 15:17:37 -0500 Subject: [PATCH 4/7] cleanup --- go/binlog/binlog_dml_event.go | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/go/binlog/binlog_dml_event.go b/go/binlog/binlog_dml_event.go index 6f96c2a7b..2c7aa365d 100644 --- a/go/binlog/binlog_dml_event.go +++ b/go/binlog/binlog_dml_event.go @@ -10,7 +10,6 @@ import ( "strings" "github.com/github/gh-ost/go/sql" - "github.com/go-mysql-org/go-mysql/replication" ) type EventDML string @@ -22,31 +21,6 @@ const ( DeleteDML EventDML = "Delete" ) -// ToEventDMLFromType converts a binlog EventType directly to EventDML. -// This is more efficient than ToEventDML() as it avoids string allocation and parsing. -func ToEventDMLFromType(eventType replication.EventType) EventDML { - switch eventType { - case replication.WRITE_ROWS_EVENTv0, - replication.WRITE_ROWS_EVENTv1, - replication.WRITE_ROWS_EVENTv2, - replication.MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: - return InsertDML - case replication.UPDATE_ROWS_EVENTv0, - replication.UPDATE_ROWS_EVENTv1, - replication.UPDATE_ROWS_EVENTv2, - replication.MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1, - replication.PARTIAL_UPDATE_ROWS_EVENT: - return UpdateDML - case replication.DELETE_ROWS_EVENTv0, - replication.DELETE_ROWS_EVENTv1, - replication.DELETE_ROWS_EVENTv2, - replication.MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: - return DeleteDML - default: - return NotDML - } -} - func ToEventDML(description string) EventDML { // description can be a statement (`UPDATE my_table ...`) or a RBR event name (`UpdateRowsEventV2`) description = strings.TrimSpace(strings.Split(description, " ")[0]) From 66aef8505978a1d1633c0b1cec5a8fb1771957f3 Mon Sep 17 00:00:00 2001 From: Patrick Begley Date: Thu, 5 Mar 2026 16:08:26 -0500 Subject: [PATCH 5/7] cleanup more --- go/binlog/gomysql_reader.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index febbd9d36..9a004a04d 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -103,17 +103,16 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven } else { currentCoords = this.GetCurrentBinlogCoordinates() } - - // Use direct EventType switch instead of string conversion - dml := ToEventDMLFromType(ev.Header.EventType) + + dml := ToEventDML(ev.Header.EventType.String()) if dml == NotDML { return fmt.Errorf("Unknown DML type: %v", ev.Header.EventType) } - + // Convert schema and table names once per RowsEvent, not per row schemaName := string(rowsEvent.Table.Schema) tableName := string(rowsEvent.Table.Table) - + for i, row := range rowsEvent.Rows { if dml == UpdateDML && i%2 == 1 { // An update has two rows (WHERE+SET) @@ -122,7 +121,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven } binlogEntry := NewBinlogEntryAt(currentCoords) binlogEntry.DmlEvent = NewBinlogDMLEvent(schemaName, tableName, dml) - + switch dml { case InsertDML: { @@ -186,10 +185,8 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha if err != nil { return err } - //this.currentCoordinatesMutex.Lock() this.currentCoordinates = mysql.NewLazyGTIDCoordinates(this.lastCommittedGTIDSet, sid, event.GNO) this.currentTrxCoords = this.currentCoordinates - //this.currentCoordinatesMutex.Unlock() case *replication.RotateEvent: if this.migrationContext.UseGTIDs { continue From 338ee5a1be462388ebb347c6843b2d5116a6da22 Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Wed, 25 Mar 2026 18:40:47 +0100 Subject: [PATCH 6/7] refactor the lazy gtid types --- go/binlog/gomysql_reader.go | 45 +++++---- go/binlog/streaming_bench_test.go | 51 +++++----- go/mysql/binlog_gtid.go | 151 ++++++++++++------------------ 3 files changed, 115 insertions(+), 132 deletions(-) diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 9a004a04d..c924d1995 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -17,7 +17,7 @@ import ( gomysql "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" - uuid "github.com/google/uuid" + "github.com/google/uuid" "golang.org/x/net/context" ) @@ -32,15 +32,15 @@ type GoMySQLReader struct { // If using the file coordinates it is binlog position of the transaction's XID event. LastTrxCoords mysql.BinlogCoordinates // currentTrxCoords is set once per GTIDEvent and shared by all RowsEvents within - // the same transaction. It points to currentCoordinates (a *LazyGTIDCoordinates), + // the same transaction. It points to currentCoordinates (a lazy *GTIDBinlogCoordinates), // which is replaced at the next GTIDEvent — so old entries retain valid references. // Only accessed from within the StreamEvents goroutine; no mutex needed. currentTrxCoords mysql.BinlogCoordinates - // lastCommittedGTIDSet is the MysqlGTIDSet from the most recently seen XIDEvent - // (or the initial coordinates). It is immutable once set and used as the base for - // LazyGTIDCoordinates so we avoid cloning the full set on each GTIDEvent. - // Only written from within StreamEvents; no mutex needed. - lastCommittedGTIDSet *gomysql.MysqlGTIDSet + // lastCommittedCoords is the GTIDBinlogCoordinates from the most recently seen + // XIDEvent (or the initial coordinates). WithPendingGTID aliases its GTIDSet as + // the base for each new in-flight coord, so the Clone is deferred until actually + // needed. Only written from within StreamEvents; no mutex needed. + lastCommittedCoords *mysql.GTIDBinlogCoordinates } func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader { @@ -78,7 +78,7 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin // Start sync with specified GTID set or binlog file and position if this.migrationContext.UseGTIDs { coords := coordinates.(*mysql.GTIDBinlogCoordinates) - this.lastCommittedGTIDSet = coords.GTIDSet + this.lastCommittedCoords = coords this.binlogStreamer, err = this.binlogSyncer.StartSyncGTID(coords.GTIDSet) } else { coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates) @@ -91,8 +91,8 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin } func (this *GoMySQLReader) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates { - //this.currentCoordinatesMutex.Lock() - //defer this.currentCoordinatesMutex.Unlock() + this.currentCoordinatesMutex.Lock() + defer this.currentCoordinatesMutex.Unlock() return this.currentCoordinates.Clone() } @@ -185,8 +185,11 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha if err != nil { return err } - this.currentCoordinates = mysql.NewLazyGTIDCoordinates(this.lastCommittedGTIDSet, sid, event.GNO) - this.currentTrxCoords = this.currentCoordinates + pending := this.lastCommittedCoords.WithPendingGTID(sid, event.GNO) + this.currentCoordinatesMutex.Lock() + this.currentCoordinates = pending + this.currentCoordinatesMutex.Unlock() + this.currentTrxCoords = pending case *replication.RotateEvent: if this.migrationContext.UseGTIDs { continue @@ -198,14 +201,16 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha this.currentCoordinatesMutex.Unlock() case *replication.XIDEvent: if this.migrationContext.UseGTIDs { - gSet := event.GSet.(*gomysql.MysqlGTIDSet) - if coords, ok := this.LastTrxCoords.(*mysql.GTIDBinlogCoordinates); ok { - coords.GTIDSet = gSet - coords.UUIDSet = nil - } else { - this.LastTrxCoords = &mysql.GTIDBinlogCoordinates{GTIDSet: gSet} - } - this.lastCommittedGTIDSet = gSet + // go-mysql allocates a fresh MysqlGTIDSet for every XIDEvent it decodes + // from the binlog stream, so we can alias event.GSet directly without + // cloning it. The pointer is then shared by LastTrxCoords and + // lastCommittedCoords. lastCommittedCoords is subsequently used as the + // base inside WithPendingGTID: it is cloned there only if a comparison + // or string representation is actually requested, and never mutated. + // Any future code that modifies the set after this point must Clone first. + committed := &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet.(*gomysql.MysqlGTIDSet)} + this.LastTrxCoords = committed + this.lastCommittedCoords = committed } else { this.LastTrxCoords = this.currentCoordinates.Clone() } diff --git a/go/binlog/streaming_bench_test.go b/go/binlog/streaming_bench_test.go index 3050062d3..047eb61a0 100644 --- a/go/binlog/streaming_bench_test.go +++ b/go/binlog/streaming_bench_test.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "os" - "strings" "sync" "sync/atomic" "testing" @@ -22,36 +21,41 @@ const ( benchRowsPerTx = 5 ) -// 02b9e2cf-9c8a-11e7-a479-42010ae7009b — one of the real servers in the set -var benchServerSID = []byte{ - 0x02, 0xb9, 0xe2, 0xcf, 0x9c, 0x8a, 0x11, 0xe7, - 0xa4, 0x79, 0x42, 0x01, 0x0a, 0xe7, 0x00, 0x9b, -} - -func loadProductionGTIDSet(tb testing.TB) *gomysql.MysqlGTIDSet { - data, err := os.ReadFile("../../gtid_executed_shard21") - if err != nil { - tb.Fatalf("could not load gtid_executed_shard21: %v", err) +const benchNumUUIDs = 182 + +// benchServerSID is a fixed synthetic UUID used as the "active" server in benchmarks. +var benchServerSID = guuid.MustParse("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee") + +// buildSyntheticGTIDSet generates a realistic-sized GTID set with numUUIDs unique +// server UUIDs, each with a transaction range of 1 to a varying upper bound. +func buildSyntheticGTIDSet(numUUIDs int) *gomysql.MysqlGTIDSet { + set := new(gomysql.MysqlGTIDSet) + set.Sets = make(map[string]*gomysql.UUIDSet) + for i := 0; i < numUUIDs; i++ { + // Deterministic UUIDs seeded from index + sid := guuid.MustParse(fmt.Sprintf("%08x-0000-4000-8000-%012x", i, i)) + gno := int64(1_000_000 + i*100_000) + uuidSet := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: 1, Stop: gno + 1}) + set.Sets[sid.String()] = uuidSet } - cleaned := strings.Join(strings.Fields(string(data)), ",") - set, err := gomysql.ParseMysqlGTIDSet(cleaned) - if err != nil { - tb.Fatalf("could not parse GTID set: %v", err) - } - return set.(*gomysql.MysqlGTIDSet) + // Include the benchmark server UUID so the initial set size stays consistent + // throughout the benchmark (no extra map entry created on first AddSet). + set.Sets[benchServerSID.String()] = gomysql.NewUUIDSet(benchServerSID, gomysql.Interval{Start: 1, Stop: 2}) + return set } func buildGTIDEvents(initialSet *gomysql.MysqlGTIDSet) []*replication.BinlogEvent { events := make([]*replication.BinlogEvent, 0, benchTxCount*(benchRowsPerTx+2)) accSet := initialSet.Clone().(*gomysql.MysqlGTIDSet) - sid, _ := guuid.FromBytes(benchServerSID) + sid := benchServerSID + sidBytes, _ := sid.MarshalBinary() for i := 0; i < benchTxCount; i++ { - gno := int64(73_590_714 + i) + gno := int64(i + 1) events = append(events, &replication.BinlogEvent{ Header: &replication.EventHeader{EventType: replication.GTID_EVENT}, - Event: &replication.GTIDEvent{SID: benchServerSID, GNO: gno}, + Event: &replication.GTIDEvent{SID: sidBytes, GNO: gno}, }) for r := 0; r < benchRowsPerTx; r++ { @@ -154,6 +158,9 @@ func feedAndRun(b *testing.B, label string, useGTIDs bool, events []*replication currentCoordinates: initialCoords.Clone(), binlogStreamer: s, } + if useGTIDs { + reader.lastCommittedCoords = initialCoords.(*mysql.GTIDBinlogCoordinates) + } entriesCh := make(chan *BinlogEntry, 100) // Feed events concurrently so AddEventToStreamer never blocks. @@ -188,10 +195,10 @@ func feedAndRun(b *testing.B, label string, useGTIDs bool, events []*replication } func BenchmarkStreamingGTID(b *testing.B) { - initialSet := loadProductionGTIDSet(b) + initialSet := buildSyntheticGTIDSet(benchNumUUIDs) events := buildGTIDEvents(initialSet) initialCoords := &mysql.GTIDBinlogCoordinates{GTIDSet: initialSet} - feedAndRun(b, "GTID (182 UUIDs)", true, events, initialCoords) + feedAndRun(b, fmt.Sprintf("GTID (%d UUIDs)", benchNumUUIDs), true, events, initialCoords) } func BenchmarkStreamingFile(b *testing.B) { diff --git a/go/mysql/binlog_gtid.go b/go/mysql/binlog_gtid.go index 4bbe4f9bd..3a516a3e2 100644 --- a/go/mysql/binlog_gtid.go +++ b/go/mysql/binlog_gtid.go @@ -7,16 +7,25 @@ package mysql import ( "fmt" - "sync" gomysql "github.com/go-mysql-org/go-mysql/mysql" uuid "github.com/google/uuid" ) // GTIDBinlogCoordinates describe binary log coordinates in MySQL GTID format. +// +// In committed mode (pendingGNO == 0): GTIDSet is the full, materialised set. +// +// In pending mode (pendingGNO != 0): GTIDSet is the base set from the last +// committed transaction; pendingSID:pendingGNO is the announced-but-not-yet- +// committed GTID. The expensive Clone is deferred until resolvedGTIDSet is called, +// which only happens when comparisons or string representations are needed — not on +// every row event in the hot path. type GTIDBinlogCoordinates struct { GTIDSet *gomysql.MysqlGTIDSet - UUIDSet *gomysql.UUIDSet + + pendingSID uuid.UUID // non-zero only in pending mode + pendingGNO int64 // non-zero only in pending mode } // NewGTIDBinlogCoordinates parses a MySQL GTID set into a *GTIDBinlogCoordinates struct. @@ -27,126 +36,88 @@ func NewGTIDBinlogCoordinates(gtidSet string) (*GTIDBinlogCoordinates, error) { }, err } -// DisplayString returns a user-friendly string representation of these current UUID set or the full GTID set. -func (this *GTIDBinlogCoordinates) DisplayString() string { - if this.UUIDSet != nil { - return this.UUIDSet.String() +// WithPendingGTID returns coordinates for a transaction that has been announced +// (via GTIDEvent) but not yet committed. g.GTIDSet is aliased directly as the base +// without cloning; the Clone is deferred until the coordinates are actually compared +// or stringified. g must not be mutated after this call. +func (g *GTIDBinlogCoordinates) WithPendingGTID(sid uuid.UUID, gno int64) *GTIDBinlogCoordinates { + return >IDBinlogCoordinates{GTIDSet: g.resolvedGTIDSet(), pendingSID: sid, pendingGNO: gno} +} + +// resolvedGTIDSet returns the effective MysqlGTIDSet. +// In committed mode (pendingGNO == 0) this is GTIDSet directly — no allocation. +// In pending mode it clones GTIDSet and adds the pending GTID. +func (g *GTIDBinlogCoordinates) resolvedGTIDSet() *gomysql.MysqlGTIDSet { + if g.pendingGNO != 0 { + set := g.GTIDSet.Clone().(*gomysql.MysqlGTIDSet) + set.AddGTID(g.pendingSID, g.pendingGNO) + return set } - return this.String() + return g.GTIDSet } -// String returns a user-friendly string representation of these full GTID set. -func (this GTIDBinlogCoordinates) String() string { - return this.GTIDSet.String() +// DisplayString returns a user-friendly string representation. +// In pending mode it returns sid:gno cheaply without cloning the full set. +func (g *GTIDBinlogCoordinates) DisplayString() string { + if g.pendingGNO != 0 { + return fmt.Sprintf("%s:%d", g.pendingSID, g.pendingGNO) + } + return g.String() +} + +// String returns the full GTID set string. +func (g GTIDBinlogCoordinates) String() string { + return g.resolvedGTIDSet().String() } // Equals tests equality of this coordinate and another one. -func (this *GTIDBinlogCoordinates) Equals(other BinlogCoordinates) bool { - if other == nil || this.IsEmpty() || other.IsEmpty() { +func (g *GTIDBinlogCoordinates) Equals(other BinlogCoordinates) bool { + if other == nil || g.IsEmpty() || other.IsEmpty() { return false } - otherCoords, ok := other.(*GTIDBinlogCoordinates) if !ok { return false } - - return this.GTIDSet.Equal(otherCoords.GTIDSet) + return g.resolvedGTIDSet().Equal(otherCoords.resolvedGTIDSet()) } // IsEmpty returns true if the GTID set is empty. -func (this *GTIDBinlogCoordinates) IsEmpty() bool { - return this.GTIDSet == nil +func (g *GTIDBinlogCoordinates) IsEmpty() bool { + return g.GTIDSet == nil } // SmallerThan returns true if this coordinate is strictly smaller than the other. -func (this *GTIDBinlogCoordinates) SmallerThan(other BinlogCoordinates) bool { - if other == nil || this.IsEmpty() || other.IsEmpty() { +func (g *GTIDBinlogCoordinates) SmallerThan(other BinlogCoordinates) bool { + if other == nil || g.IsEmpty() || other.IsEmpty() { return false } otherCoords, ok := other.(*GTIDBinlogCoordinates) if !ok { return false } - // if 'this' does not contain the same sets we assume we are behind 'other'. // there are probably edge cases where this isn't true - return !this.GTIDSet.Contain(otherCoords.GTIDSet) + return !g.resolvedGTIDSet().Contain(otherCoords.resolvedGTIDSet()) } // SmallerThanOrEquals returns true if this coordinate is the same or equal to the other one. -func (this *GTIDBinlogCoordinates) SmallerThanOrEquals(other BinlogCoordinates) bool { - return this.Equals(other) || this.SmallerThan(other) +func (g *GTIDBinlogCoordinates) SmallerThanOrEquals(other BinlogCoordinates) bool { + return g.Equals(other) || g.SmallerThan(other) } -func (this *GTIDBinlogCoordinates) Clone() BinlogCoordinates { +// Clone returns an independent, committed GTIDBinlogCoordinates snapshot. +// In pending mode the set is materialised and deep-copied in one pass. +func (g *GTIDBinlogCoordinates) Clone() BinlogCoordinates { out := >IDBinlogCoordinates{} - if this.GTIDSet != nil { - out.GTIDSet = this.GTIDSet.Clone().(*gomysql.MysqlGTIDSet) - } - if this.UUIDSet != nil { - out.UUIDSet = this.UUIDSet.Clone() + if g.GTIDSet != nil { + if g.pendingGNO != 0 { + set := g.GTIDSet.Clone().(*gomysql.MysqlGTIDSet) + set.AddGTID(g.pendingSID, g.pendingGNO) + out.GTIDSet = set + } else { + out.GTIDSet = g.GTIDSet.Clone().(*gomysql.MysqlGTIDSet) + } } return out } - -// LazyGTIDCoordinates describes the in-flight coordinates of a transaction that -// has been announced via GTIDEvent but not yet committed (XIDEvent not yet seen). -// It holds a stable, immutable reference to the last-committed MysqlGTIDSet and -// the current transaction's GTID. The expensive Clone of the full set is deferred -// until Materialize is actually called, which only happens when external callers -// need a snapshot (via GetCurrentBinlogCoordinates) or when a comparison is made — -// not on every row event in the hot path. -type LazyGTIDCoordinates struct { - base *gomysql.MysqlGTIDSet // last-committed GTIDSet; immutable, not owned - sid uuid.UUID // current transaction's server UUID - gno int64 // current transaction's GNO - - cacheMutex sync.Mutex - cachedMaterialized *GTIDBinlogCoordinates -} - -// NewLazyGTIDCoordinates creates coordinates for an in-flight transaction. -// base must be the MysqlGTIDSet of the last committed transaction and must -// not be mutated after this call. -func NewLazyGTIDCoordinates(base *gomysql.MysqlGTIDSet, sid uuid.UUID, gno int64) *LazyGTIDCoordinates { - return &LazyGTIDCoordinates{base: base, sid: sid, gno: gno} -} - -// Materialize clones the base set, adds the in-flight GTID, and returns a full -// GTIDBinlogCoordinates. The result is an independent snapshot safe to hold across -// transaction boundaries. This is the only point where a MysqlGTIDSet.Clone occurs. -func (l *LazyGTIDCoordinates) Materialize() *GTIDBinlogCoordinates { - l.cacheMutex.Lock() - defer l.cacheMutex.Unlock() - - if l.cachedMaterialized != nil { - return l.cachedMaterialized - } - - set := l.base.Clone().(*gomysql.MysqlGTIDSet) - set.AddGTID(l.sid, l.gno) - l.cachedMaterialized = >IDBinlogCoordinates{GTIDSet: set} - return l.cachedMaterialized -} - -func (l *LazyGTIDCoordinates) String() string { return l.Materialize().String() } -func (l *LazyGTIDCoordinates) DisplayString() string { return fmt.Sprintf("%s:%d", l.sid, l.gno) } -func (l *LazyGTIDCoordinates) IsEmpty() bool { return l.base == nil } - -func (l *LazyGTIDCoordinates) Equals(other BinlogCoordinates) bool { - return l.Materialize().Equals(other) -} - -func (l *LazyGTIDCoordinates) SmallerThan(other BinlogCoordinates) bool { - return l.Materialize().SmallerThan(other) -} - -func (l *LazyGTIDCoordinates) SmallerThanOrEquals(other BinlogCoordinates) bool { - return l.Materialize().SmallerThanOrEquals(other) -} - -// Clone materializes the full coordinates. The returned *GTIDBinlogCoordinates is -// an independent copy; callers receive a concrete type regardless of which -// BinlogCoordinates implementation produced it. -func (l *LazyGTIDCoordinates) Clone() BinlogCoordinates { return l.Materialize() } From f8972a467e6a65c757eb0fc9b0a8e7e6d38f328e Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Fri, 3 Apr 2026 17:30:13 +0200 Subject: [PATCH 7/7] encapsulate GTIDSet, cache materialization, clean up comments --- go/binlog/gomysql_reader.go | 15 ++---- go/binlog/streaming_bench_test.go | 2 +- go/logic/streamer.go | 2 +- go/mysql/binlog_file_test.go | 24 +++++----- go/mysql/binlog_gtid.go | 80 +++++++++++++------------------ 5 files changed, 52 insertions(+), 71 deletions(-) diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index c924d1995..743e58bea 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -79,7 +79,7 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin if this.migrationContext.UseGTIDs { coords := coordinates.(*mysql.GTIDBinlogCoordinates) this.lastCommittedCoords = coords - this.binlogStreamer, err = this.binlogSyncer.StartSyncGTID(coords.GTIDSet) + this.binlogStreamer, err = this.binlogSyncer.StartSyncGTID(coords.GTIDSet()) } else { coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates) this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{ @@ -109,7 +109,6 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven return fmt.Errorf("Unknown DML type: %v", ev.Header.EventType) } - // Convert schema and table names once per RowsEvent, not per row schemaName := string(rowsEvent.Table.Schema) tableName := string(rowsEvent.Table.Table) @@ -201,14 +200,10 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha this.currentCoordinatesMutex.Unlock() case *replication.XIDEvent: if this.migrationContext.UseGTIDs { - // go-mysql allocates a fresh MysqlGTIDSet for every XIDEvent it decodes - // from the binlog stream, so we can alias event.GSet directly without - // cloning it. The pointer is then shared by LastTrxCoords and - // lastCommittedCoords. lastCommittedCoords is subsequently used as the - // base inside WithPendingGTID: it is cloned there only if a comparison - // or string representation is actually requested, and never mutated. - // Any future code that modifies the set after this point must Clone first. - committed := &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet.(*gomysql.MysqlGTIDSet)} + // go-mysql allocates a fresh MysqlGTIDSet for every XIDEvent, so we can + // alias event.GSet directly. gtidSet is unexported; mutation from outside + // the mysql package is not possible. + committed := mysql.NewGTIDBinlogCoordinatesFromSet(event.GSet.(*gomysql.MysqlGTIDSet)) this.LastTrxCoords = committed this.lastCommittedCoords = committed } else { diff --git a/go/binlog/streaming_bench_test.go b/go/binlog/streaming_bench_test.go index 047eb61a0..fca5e83e3 100644 --- a/go/binlog/streaming_bench_test.go +++ b/go/binlog/streaming_bench_test.go @@ -197,7 +197,7 @@ func feedAndRun(b *testing.B, label string, useGTIDs bool, events []*replication func BenchmarkStreamingGTID(b *testing.B) { initialSet := buildSyntheticGTIDSet(benchNumUUIDs) events := buildGTIDEvents(initialSet) - initialCoords := &mysql.GTIDBinlogCoordinates{GTIDSet: initialSet} + initialCoords := mysql.NewGTIDBinlogCoordinatesFromSet(initialSet) feedAndRun(b, fmt.Sprintf("GTID (%d UUIDs)", benchNumUUIDs), true, events, initialCoords) } diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 1c2635138..d11bc5415 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -153,7 +153,7 @@ func (this *EventsStreamer) readCurrentBinlogCoordinates() error { if err != nil { return err } - this.initialBinlogCoordinates = &mysql.GTIDBinlogCoordinates{GTIDSet: gtidSet.(*gomysql.MysqlGTIDSet)} + this.initialBinlogCoordinates = mysql.NewGTIDBinlogCoordinatesFromSet(gtidSet.(*gomysql.MysqlGTIDSet)) } else { this.initialBinlogCoordinates = &mysql.FileBinlogCoordinates{ LogFile: m.GetString("File"), diff --git a/go/mysql/binlog_file_test.go b/go/mysql/binlog_file_test.go index 50d513698..5a85c946a 100644 --- a/go/mysql/binlog_file_test.go +++ b/go/mysql/binlog_file_test.go @@ -48,14 +48,14 @@ func TestBinlogCoordinates(t *testing.T) { 48e2bc1d-d66d-11e8-bf56-a0369f9437b8:1, 492e2980-4518-11e9-92c6-e4434b3eca94:1-4926754399`) - c5 := GTIDBinlogCoordinates{GTIDSet: gtidSet1.(*gomysql.MysqlGTIDSet)} - c6 := GTIDBinlogCoordinates{GTIDSet: gtidSet1.(*gomysql.MysqlGTIDSet)} - c7 := GTIDBinlogCoordinates{GTIDSet: gtidSet2.(*gomysql.MysqlGTIDSet)} - c8 := GTIDBinlogCoordinates{GTIDSet: gtidSet3.(*gomysql.MysqlGTIDSet)} - c9 := GTIDBinlogCoordinates{GTIDSet: gtidSetBig1.(*gomysql.MysqlGTIDSet)} - c10 := GTIDBinlogCoordinates{GTIDSet: gtidSetBig2.(*gomysql.MysqlGTIDSet)} - - require.True(t, c5.Equals(&c6)) + c5 := NewGTIDBinlogCoordinatesFromSet(gtidSet1.(*gomysql.MysqlGTIDSet)) + c6 := NewGTIDBinlogCoordinatesFromSet(gtidSet1.(*gomysql.MysqlGTIDSet)) + c7 := NewGTIDBinlogCoordinatesFromSet(gtidSet2.(*gomysql.MysqlGTIDSet)) + c8 := NewGTIDBinlogCoordinatesFromSet(gtidSet3.(*gomysql.MysqlGTIDSet)) + c9 := NewGTIDBinlogCoordinatesFromSet(gtidSetBig1.(*gomysql.MysqlGTIDSet)) + c10 := NewGTIDBinlogCoordinatesFromSet(gtidSetBig2.(*gomysql.MysqlGTIDSet)) + + require.True(t, c5.Equals(c6)) require.True(t, c1.Equals(&c2)) require.False(t, c1.Equals(&c3)) require.False(t, c1.Equals(&c4)) @@ -70,10 +70,10 @@ func TestBinlogCoordinates(t *testing.T) { require.True(t, c1.SmallerThanOrEquals(&c3)) require.True(t, c1.SmallerThanOrEquals(&c2)) require.True(t, c1.SmallerThanOrEquals(&c3)) - require.True(t, c6.SmallerThanOrEquals(&c7)) - require.True(t, c7.SmallerThanOrEquals(&c8)) - require.True(t, c9.SmallerThanOrEquals(&c9)) - require.True(t, c9.SmallerThanOrEquals(&c10)) + require.True(t, c6.SmallerThanOrEquals(c7)) + require.True(t, c7.SmallerThanOrEquals(c8)) + require.True(t, c9.SmallerThanOrEquals(c9)) + require.True(t, c9.SmallerThanOrEquals(c10)) } func TestBinlogCoordinatesAsKey(t *testing.T) { diff --git a/go/mysql/binlog_gtid.go b/go/mysql/binlog_gtid.go index 3a516a3e2..4e4411e40 100644 --- a/go/mysql/binlog_gtid.go +++ b/go/mysql/binlog_gtid.go @@ -7,57 +7,56 @@ package mysql import ( "fmt" + "sync" gomysql "github.com/go-mysql-org/go-mysql/mysql" uuid "github.com/google/uuid" ) // GTIDBinlogCoordinates describe binary log coordinates in MySQL GTID format. -// -// In committed mode (pendingGNO == 0): GTIDSet is the full, materialised set. -// -// In pending mode (pendingGNO != 0): GTIDSet is the base set from the last -// committed transaction; pendingSID:pendingGNO is the announced-but-not-yet- -// committed GTID. The expensive Clone is deferred until resolvedGTIDSet is called, -// which only happens when comparisons or string representations are needed — not on -// every row event in the hot path. +// In pending mode (pendingGNO != 0), gtidSet is the base from the last committed +// transaction and pendingSID:pendingGNO is the in-flight GTID. The materialized +// set is computed lazily and cached on first use. type GTIDBinlogCoordinates struct { - GTIDSet *gomysql.MysqlGTIDSet - - pendingSID uuid.UUID // non-zero only in pending mode - pendingGNO int64 // non-zero only in pending mode + gtidSet *gomysql.MysqlGTIDSet + pendingSID uuid.UUID + pendingGNO int64 + once sync.Once + resolved *gomysql.MysqlGTIDSet } -// NewGTIDBinlogCoordinates parses a MySQL GTID set into a *GTIDBinlogCoordinates struct. func NewGTIDBinlogCoordinates(gtidSet string) (*GTIDBinlogCoordinates, error) { set, err := gomysql.ParseMysqlGTIDSet(gtidSet) - return >IDBinlogCoordinates{ - GTIDSet: set.(*gomysql.MysqlGTIDSet), - }, err + return >IDBinlogCoordinates{gtidSet: set.(*gomysql.MysqlGTIDSet)}, err +} + +func NewGTIDBinlogCoordinatesFromSet(set *gomysql.MysqlGTIDSet) *GTIDBinlogCoordinates { + return >IDBinlogCoordinates{gtidSet: set} } -// WithPendingGTID returns coordinates for a transaction that has been announced -// (via GTIDEvent) but not yet committed. g.GTIDSet is aliased directly as the base -// without cloning; the Clone is deferred until the coordinates are actually compared -// or stringified. g must not be mutated after this call. +func (g *GTIDBinlogCoordinates) GTIDSet() *gomysql.MysqlGTIDSet { + return g.gtidSet +} + +// WithPendingGTID returns a new pending coordinate using g.gtidSet as the base. +// g.gtidSet is aliased without cloning; g must not be modified after this call. func (g *GTIDBinlogCoordinates) WithPendingGTID(sid uuid.UUID, gno int64) *GTIDBinlogCoordinates { - return >IDBinlogCoordinates{GTIDSet: g.resolvedGTIDSet(), pendingSID: sid, pendingGNO: gno} + return >IDBinlogCoordinates{gtidSet: g.resolvedGTIDSet(), pendingSID: sid, pendingGNO: gno} } -// resolvedGTIDSet returns the effective MysqlGTIDSet. -// In committed mode (pendingGNO == 0) this is GTIDSet directly — no allocation. -// In pending mode it clones GTIDSet and adds the pending GTID. func (g *GTIDBinlogCoordinates) resolvedGTIDSet() *gomysql.MysqlGTIDSet { if g.pendingGNO != 0 { - set := g.GTIDSet.Clone().(*gomysql.MysqlGTIDSet) - set.AddGTID(g.pendingSID, g.pendingGNO) - return set + g.once.Do(func() { + set := g.gtidSet.Clone().(*gomysql.MysqlGTIDSet) + set.AddGTID(g.pendingSID, g.pendingGNO) + g.resolved = set + }) + return g.resolved } - return g.GTIDSet + return g.gtidSet } -// DisplayString returns a user-friendly string representation. -// In pending mode it returns sid:gno cheaply without cloning the full set. +// DisplayString returns sid:gno in pending mode, otherwise the full GTID set string. func (g *GTIDBinlogCoordinates) DisplayString() string { if g.pendingGNO != 0 { return fmt.Sprintf("%s:%d", g.pendingSID, g.pendingGNO) @@ -65,12 +64,10 @@ func (g *GTIDBinlogCoordinates) DisplayString() string { return g.String() } -// String returns the full GTID set string. -func (g GTIDBinlogCoordinates) String() string { +func (g *GTIDBinlogCoordinates) String() string { return g.resolvedGTIDSet().String() } -// Equals tests equality of this coordinate and another one. func (g *GTIDBinlogCoordinates) Equals(other BinlogCoordinates) bool { if other == nil || g.IsEmpty() || other.IsEmpty() { return false @@ -82,12 +79,10 @@ func (g *GTIDBinlogCoordinates) Equals(other BinlogCoordinates) bool { return g.resolvedGTIDSet().Equal(otherCoords.resolvedGTIDSet()) } -// IsEmpty returns true if the GTID set is empty. func (g *GTIDBinlogCoordinates) IsEmpty() bool { - return g.GTIDSet == nil + return g.gtidSet == nil } -// SmallerThan returns true if this coordinate is strictly smaller than the other. func (g *GTIDBinlogCoordinates) SmallerThan(other BinlogCoordinates) bool { if other == nil || g.IsEmpty() || other.IsEmpty() { return false @@ -101,23 +96,14 @@ func (g *GTIDBinlogCoordinates) SmallerThan(other BinlogCoordinates) bool { return !g.resolvedGTIDSet().Contain(otherCoords.resolvedGTIDSet()) } -// SmallerThanOrEquals returns true if this coordinate is the same or equal to the other one. func (g *GTIDBinlogCoordinates) SmallerThanOrEquals(other BinlogCoordinates) bool { return g.Equals(other) || g.SmallerThan(other) } -// Clone returns an independent, committed GTIDBinlogCoordinates snapshot. -// In pending mode the set is materialised and deep-copied in one pass. func (g *GTIDBinlogCoordinates) Clone() BinlogCoordinates { out := >IDBinlogCoordinates{} - if g.GTIDSet != nil { - if g.pendingGNO != 0 { - set := g.GTIDSet.Clone().(*gomysql.MysqlGTIDSet) - set.AddGTID(g.pendingSID, g.pendingGNO) - out.GTIDSet = set - } else { - out.GTIDSet = g.GTIDSet.Clone().(*gomysql.MysqlGTIDSet) - } + if g.gtidSet != nil { + out.gtidSet = g.resolvedGTIDSet().Clone().(*gomysql.MysqlGTIDSet) } return out }