From d0a2ef8c75432b4b5636a62019255a55f75437aa Mon Sep 17 00:00:00 2001 From: Bastian Bartmann Date: Fri, 22 May 2026 11:11:53 +0200 Subject: [PATCH] =?UTF-8?q?Don=E2=80=99t=20parse=20binlog=20events=20of=20?= =?UTF-8?q?unrelated=20tables?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go/binlog/gomysql_reader.go | 50 ++++++++++++++----- go/binlog/gomysql_reader_test.go | 84 ++++++++++++++++++++++++++++++++ go/logic/streamer.go | 21 +++++++- go/logic/streamer_test.go | 28 +++++++++++ 4 files changed, 169 insertions(+), 14 deletions(-) create mode 100644 go/binlog/gomysql_reader_test.go diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 189a5f399..e752e359a 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -21,6 +21,24 @@ import ( uuid "github.com/google/uuid" ) +type RowsEventFilterFunc func(databaseName, tableName string) bool + +func newRowsEventDecodeFunc(rowsEventFilter RowsEventFilterFunc) func(*replication.RowsEvent, []byte) error { + if rowsEventFilter == nil { + return nil + } + return func(rowsEvent *replication.RowsEvent, data []byte) error { + pos, err := rowsEvent.DecodeHeader(data) + if err != nil { + return err + } + if !rowsEventFilter(string(rowsEvent.Table.Schema), string(rowsEvent.Table.Table)) { + return nil + } + return rowsEvent.DecodeData(pos, data) + } +} + type GoMySQLReader struct { migrationContext *base.MigrationContext connectionConfig *mysql.ConnectionConfig @@ -33,24 +51,30 @@ type GoMySQLReader struct { LastTrxCoords mysql.BinlogCoordinates } -func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader { +func NewGoMySQLReader(migrationContext *base.MigrationContext, rowsEventFilters ...RowsEventFilterFunc) *GoMySQLReader { connectionConfig := migrationContext.InspectorConnectionConfig + var rowsEventFilter RowsEventFilterFunc + if len(rowsEventFilters) > 0 { + rowsEventFilter = rowsEventFilters[0] + } + config := replication.BinlogSyncerConfig{ + ServerID: uint32(migrationContext.ReplicaServerId), + Flavor: gomysql.MySQLFlavor, + Host: connectionConfig.Key.Hostname, + Port: uint16(connectionConfig.Key.Port), + User: connectionConfig.User, + Password: connectionConfig.Password, + TLSConfig: connectionConfig.TLSConfig(), + UseDecimal: true, + TimestampStringLocation: time.UTC, + MaxReconnectAttempts: migrationContext.BinlogSyncerMaxReconnectAttempts, + } + config.RowsEventDecodeFunc = newRowsEventDecodeFunc(rowsEventFilter) return &GoMySQLReader{ migrationContext: migrationContext, connectionConfig: connectionConfig, currentCoordinatesMutex: &sync.Mutex{}, - binlogSyncer: replication.NewBinlogSyncer(replication.BinlogSyncerConfig{ - ServerID: uint32(migrationContext.ReplicaServerId), - Flavor: gomysql.MySQLFlavor, - Host: connectionConfig.Key.Hostname, - Port: uint16(connectionConfig.Key.Port), - User: connectionConfig.User, - Password: connectionConfig.Password, - TLSConfig: connectionConfig.TLSConfig(), - UseDecimal: true, - TimestampStringLocation: time.UTC, - MaxReconnectAttempts: migrationContext.BinlogSyncerMaxReconnectAttempts, - }), + binlogSyncer: replication.NewBinlogSyncer(config), } } diff --git a/go/binlog/gomysql_reader_test.go b/go/binlog/gomysql_reader_test.go new file mode 100644 index 000000000..6040716ad --- /dev/null +++ b/go/binlog/gomysql_reader_test.go @@ -0,0 +1,84 @@ +package binlog + +import ( + "reflect" + "testing" + "unsafe" + + gomysql "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" + "github.com/stretchr/testify/require" +) + +func setUnexportedField(target interface{}, fieldName string, value interface{}) { + field := reflect.ValueOf(target).Elem().FieldByName(fieldName) + reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Set(reflect.ValueOf(value)) +} + +func newTestRowsEvent(schemaName, tableName string) *replication.RowsEvent { + const tableID uint64 = 7 + + rowsEvent := &replication.RowsEvent{ + Version: 1, + } + setUnexportedField(rowsEvent, "tableIDSize", 6) + setUnexportedField(rowsEvent, "needBitmap2", false) + setUnexportedField(rowsEvent, "eventType", replication.WRITE_ROWS_EVENTv1) + setUnexportedField(rowsEvent, "tables", map[uint64]*replication.TableMapEvent{ + tableID: { + TableID: tableID, + Schema: []byte(schemaName), + Table: []byte(tableName), + ColumnCount: 1, + ColumnType: []byte{gomysql.MYSQL_TYPE_LONG}, + ColumnMeta: []uint16{0}, + NullBitmap: []byte{0x00}, + }, + }) + return rowsEvent +} + +func newRowsEventDataWithInvalidRowPayload() []byte { + // RowsEvent v1 header: + // - 6 byte little-endian table id + // - 2 byte flags + // - length-encoded column count = 1 + // - 1 byte column bitmap + // The final byte is an intentionally truncated row payload. DecodeHeader can + // parse the table/schema/name, but DecodeData will fail if it is invoked. + return []byte{7, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0} +} + +func TestRowsEventDecodeFuncSkipsRowDataForFilteredTable(t *testing.T) { + decodeFunc := newRowsEventDecodeFunc(func(databaseName, tableName string) bool { + require.Equal(t, "testdb", databaseName) + require.Equal(t, "ignored_table", tableName) + return false + }) + require.NotNil(t, decodeFunc) + + rowsEvent := newTestRowsEvent("testdb", "ignored_table") + err := decodeFunc(rowsEvent, newRowsEventDataWithInvalidRowPayload()) + require.NoError(t, err) + require.Equal(t, uint64(7), rowsEvent.TableID) + require.Equal(t, "testdb", string(rowsEvent.Table.Schema)) + require.Equal(t, "ignored_table", string(rowsEvent.Table.Table)) + require.Empty(t, rowsEvent.Rows) +} + +func TestRowsEventDecodeFuncDecodesRowDataForMatchingTable(t *testing.T) { + decodeFunc := newRowsEventDecodeFunc(func(databaseName, tableName string) bool { + require.Equal(t, "testdb", databaseName) + require.Equal(t, "wanted_table", tableName) + return true + }) + require.NotNil(t, decodeFunc) + + rowsEvent := newTestRowsEvent("testdb", "wanted_table") + err := decodeFunc(rowsEvent, newRowsEventDataWithInvalidRowPayload()) + require.Error(t, err) +} + +func TestRowsEventDecodeFuncIsNilWithoutFilter(t *testing.T) { + require.Nil(t, newRowsEventDecodeFunc(nil)) +} diff --git a/go/logic/streamer.go b/go/logic/streamer.go index ecb936069..f43dd2217 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -81,6 +81,25 @@ func (es *EventsStreamer) AddListener( return nil } +// shouldDecodeRowsEvent returns true when at least one listener is registered for +// the table on which the rows event operates. This is used by the binlog parser +// after it decodes the row event header/table map, but before it decodes row data. +func (es *EventsStreamer) shouldDecodeRowsEvent(databaseName, tableName string) bool { + es.listenersMutex.Lock() + defer es.listenersMutex.Unlock() + + for _, listener := range es.listeners { + if !strings.EqualFold(listener.databaseName, databaseName) { + continue + } + if !strings.EqualFold(listener.tableName, tableName) { + continue + } + return true + } + return false +} + // notifyListeners will notify relevant listeners with given DML event. Only // listeners registered for changes on the table on which the DML operates are notified. func (es *EventsStreamer) notifyListeners(binlogEntry *binlog.BinlogEntry) { @@ -129,7 +148,7 @@ func (es *EventsStreamer) InitDBConnections() (err error) { // initBinlogReader creates and connects the reader: we hook up to a MySQL server as a replica func (es *EventsStreamer) initBinlogReader(binlogCoordinates mysql.BinlogCoordinates) error { - goMySQLReader := binlog.NewGoMySQLReader(es.migrationContext) + goMySQLReader := binlog.NewGoMySQLReader(es.migrationContext, es.shouldDecodeRowsEvent) if err := goMySQLReader.ConnectBinlogStreamer(binlogCoordinates); err != nil { return err } diff --git a/go/logic/streamer_test.go b/go/logic/streamer_test.go index e8c0812d2..b887eb098 100644 --- a/go/logic/streamer_test.go +++ b/go/logic/streamer_test.go @@ -259,6 +259,34 @@ func (suite *EventsStreamerTestSuite) TestStreamEventsAutomaticallyReconnects() suite.Require().Len(dmlEvents, 3) } +func TestEventsStreamerShouldDecodeRowsEvent(t *testing.T) { + streamer := NewEventsStreamer(newTestMigrationContext()) + + if streamer.shouldDecodeRowsEvent(testMysqlDatabase, testMysqlTableName) { + t.Fatalf("expected no table match before any listeners are registered") + } + + err := streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogEntry) error { + return nil + }) + if err != nil { + t.Fatalf("unexpected AddListener error: %+v", err) + } + + if !streamer.shouldDecodeRowsEvent(testMysqlDatabase, testMysqlTableName) { + t.Fatalf("expected registered table to be decoded") + } + if !streamer.shouldDecodeRowsEvent("TEST", "TESTING") { + t.Fatalf("expected table matching to be case-insensitive") + } + if streamer.shouldDecodeRowsEvent(testMysqlDatabase, "other_table") { + t.Fatalf("expected unregistered table to be skipped") + } + if streamer.shouldDecodeRowsEvent("other_database", testMysqlTableName) { + t.Fatalf("expected unregistered database to be skipped") + } +} + func TestEventsStreamer(t *testing.T) { if testing.Short() { t.Skip("skipping events streamer test suite in short mode")