Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@ import (
uuid "github.com/google/uuid"
)

type RowsEventFilterFunc func(databaseName, tableName string) bool

func newRowsEventDecodeFunc(rowsEventFilter RowsEventFilterFunc) func(*replication.RowsEvent, []byte) error {
if rowsEventFilter == nil {
return nil
}
return func(rowsEvent *replication.RowsEvent, data []byte) error {
pos, err := rowsEvent.DecodeHeader(data)
if err != nil {
return err
}
if !rowsEventFilter(string(rowsEvent.Table.Schema), string(rowsEvent.Table.Table)) {
return nil
}
return rowsEvent.DecodeData(pos, data)
}
}

type GoMySQLReader struct {
migrationContext *base.MigrationContext
connectionConfig *mysql.ConnectionConfig
Expand All @@ -33,24 +51,30 @@ type GoMySQLReader struct {
LastTrxCoords mysql.BinlogCoordinates
}

func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader {
func NewGoMySQLReader(migrationContext *base.MigrationContext, rowsEventFilters ...RowsEventFilterFunc) *GoMySQLReader {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it strange we're doing variadic here but were always taking 1 argument, never 0, never 1+

Copy link
Copy Markdown
Author

@coding-chimp coding-chimp May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it ourselves but it does provide backwards compatibility for external callers. I'm not sure whether we (or the gh-ost maintainers) should care about this, but PRs like this one do give the impression there are people using gh-ost as a library.

connectionConfig := migrationContext.InspectorConnectionConfig
var rowsEventFilter RowsEventFilterFunc
if len(rowsEventFilters) > 0 {
rowsEventFilter = rowsEventFilters[0]
}
config := replication.BinlogSyncerConfig{
ServerID: uint32(migrationContext.ReplicaServerId),
Flavor: gomysql.MySQLFlavor,
Host: connectionConfig.Key.Hostname,
Port: uint16(connectionConfig.Key.Port),
User: connectionConfig.User,
Password: connectionConfig.Password,
TLSConfig: connectionConfig.TLSConfig(),
UseDecimal: true,
TimestampStringLocation: time.UTC,
MaxReconnectAttempts: migrationContext.BinlogSyncerMaxReconnectAttempts,
}
config.RowsEventDecodeFunc = newRowsEventDecodeFunc(rowsEventFilter)
return &GoMySQLReader{
migrationContext: migrationContext,
connectionConfig: connectionConfig,
currentCoordinatesMutex: &sync.Mutex{},
binlogSyncer: replication.NewBinlogSyncer(replication.BinlogSyncerConfig{
ServerID: uint32(migrationContext.ReplicaServerId),
Flavor: gomysql.MySQLFlavor,
Host: connectionConfig.Key.Hostname,
Port: uint16(connectionConfig.Key.Port),
User: connectionConfig.User,
Password: connectionConfig.Password,
TLSConfig: connectionConfig.TLSConfig(),
UseDecimal: true,
TimestampStringLocation: time.UTC,
MaxReconnectAttempts: migrationContext.BinlogSyncerMaxReconnectAttempts,
}),
binlogSyncer: replication.NewBinlogSyncer(config),
}
}

Expand Down
84 changes: 84 additions & 0 deletions go/binlog/gomysql_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package binlog

import (
"reflect"
"testing"
"unsafe"

gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/stretchr/testify/require"
)

func setUnexportedField(target interface{}, fieldName string, value interface{}) {
field := reflect.ValueOf(target).Elem().FieldByName(fieldName)
reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Set(reflect.ValueOf(value))
}

func newTestRowsEvent(schemaName, tableName string) *replication.RowsEvent {
const tableID uint64 = 7

rowsEvent := &replication.RowsEvent{
Version: 1,
}
setUnexportedField(rowsEvent, "tableIDSize", 6)
setUnexportedField(rowsEvent, "needBitmap2", false)
setUnexportedField(rowsEvent, "eventType", replication.WRITE_ROWS_EVENTv1)
setUnexportedField(rowsEvent, "tables", map[uint64]*replication.TableMapEvent{
tableID: {
TableID: tableID,
Schema: []byte(schemaName),
Table: []byte(tableName),
ColumnCount: 1,
ColumnType: []byte{gomysql.MYSQL_TYPE_LONG},
ColumnMeta: []uint16{0},
NullBitmap: []byte{0x00},
},
})
return rowsEvent
}

func newRowsEventDataWithInvalidRowPayload() []byte {
// RowsEvent v1 header:
// - 6 byte little-endian table id
// - 2 byte flags
// - length-encoded column count = 1
// - 1 byte column bitmap
// The final byte is an intentionally truncated row payload. DecodeHeader can
// parse the table/schema/name, but DecodeData will fail if it is invoked.
return []byte{7, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0}
}

func TestRowsEventDecodeFuncSkipsRowDataForFilteredTable(t *testing.T) {
decodeFunc := newRowsEventDecodeFunc(func(databaseName, tableName string) bool {
require.Equal(t, "testdb", databaseName)
require.Equal(t, "ignored_table", tableName)
return false
})
require.NotNil(t, decodeFunc)

rowsEvent := newTestRowsEvent("testdb", "ignored_table")
err := decodeFunc(rowsEvent, newRowsEventDataWithInvalidRowPayload())
require.NoError(t, err)
require.Equal(t, uint64(7), rowsEvent.TableID)
require.Equal(t, "testdb", string(rowsEvent.Table.Schema))
require.Equal(t, "ignored_table", string(rowsEvent.Table.Table))
require.Empty(t, rowsEvent.Rows)
}

func TestRowsEventDecodeFuncDecodesRowDataForMatchingTable(t *testing.T) {
decodeFunc := newRowsEventDecodeFunc(func(databaseName, tableName string) bool {
require.Equal(t, "testdb", databaseName)
require.Equal(t, "wanted_table", tableName)
return true
})
require.NotNil(t, decodeFunc)

rowsEvent := newTestRowsEvent("testdb", "wanted_table")
err := decodeFunc(rowsEvent, newRowsEventDataWithInvalidRowPayload())
require.Error(t, err)
}

func TestRowsEventDecodeFuncIsNilWithoutFilter(t *testing.T) {
require.Nil(t, newRowsEventDecodeFunc(nil))
}
21 changes: 20 additions & 1 deletion go/logic/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,25 @@ func (es *EventsStreamer) AddListener(
return nil
}

// shouldDecodeRowsEvent returns true when at least one listener is registered for
// the table on which the rows event operates. This is used by the binlog parser
// after it decodes the row event header/table map, but before it decodes row data.
func (es *EventsStreamer) shouldDecodeRowsEvent(databaseName, tableName string) bool {
es.listenersMutex.Lock()
defer es.listenersMutex.Unlock()

for _, listener := range es.listeners {
if !strings.EqualFold(listener.databaseName, databaseName) {
continue
}
if !strings.EqualFold(listener.tableName, tableName) {
continue
}
return true
}
return false
}

// notifyListeners will notify relevant listeners with given DML event. Only
// listeners registered for changes on the table on which the DML operates are notified.
func (es *EventsStreamer) notifyListeners(binlogEntry *binlog.BinlogEntry) {
Expand Down Expand Up @@ -129,7 +148,7 @@ func (es *EventsStreamer) InitDBConnections() (err error) {

// initBinlogReader creates and connects the reader: we hook up to a MySQL server as a replica
func (es *EventsStreamer) initBinlogReader(binlogCoordinates mysql.BinlogCoordinates) error {
goMySQLReader := binlog.NewGoMySQLReader(es.migrationContext)
goMySQLReader := binlog.NewGoMySQLReader(es.migrationContext, es.shouldDecodeRowsEvent)
if err := goMySQLReader.ConnectBinlogStreamer(binlogCoordinates); err != nil {
return err
}
Expand Down
28 changes: 28 additions & 0 deletions go/logic/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,34 @@ func (suite *EventsStreamerTestSuite) TestStreamEventsAutomaticallyReconnects()
suite.Require().Len(dmlEvents, 3)
}

func TestEventsStreamerShouldDecodeRowsEvent(t *testing.T) {
streamer := NewEventsStreamer(newTestMigrationContext())

if streamer.shouldDecodeRowsEvent(testMysqlDatabase, testMysqlTableName) {
t.Fatalf("expected no table match before any listeners are registered")
}

err := streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogEntry) error {
return nil
})
if err != nil {
t.Fatalf("unexpected AddListener error: %+v", err)
}

if !streamer.shouldDecodeRowsEvent(testMysqlDatabase, testMysqlTableName) {
t.Fatalf("expected registered table to be decoded")
}
if !streamer.shouldDecodeRowsEvent("TEST", "TESTING") {
t.Fatalf("expected table matching to be case-insensitive")
}
if streamer.shouldDecodeRowsEvent(testMysqlDatabase, "other_table") {
t.Fatalf("expected unregistered table to be skipped")
}
if streamer.shouldDecodeRowsEvent("other_database", testMysqlTableName) {
t.Fatalf("expected unregistered database to be skipped")
}
}

func TestEventsStreamer(t *testing.T) {
if testing.Short() {
t.Skip("skipping events streamer test suite in short mode")
Expand Down
Loading