diff --git a/go/logic/migrator.go b/go/logic/migrator.go index a1a78d1e3..5dba9e7ed 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -326,13 +326,26 @@ func (mgtr *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (e heartbeatTime, err := time.Parse(time.RFC3339Nano, changelogHeartbeatString) if err != nil { return mgtr.migrationContext.Log.Errore(err) - } else { - mgtr.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime) + } + mgtr.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime) + + // Route the coords bump through applyEventsQueue so it is ordered after + // any DMLs the streamer enqueued before this heartbeat. + coords := dmlEntry.Coordinates + var writeFunc tableWriteFunc = func() error { mgtr.applier.CurrentCoordinatesMutex.Lock() - mgtr.applier.CurrentCoordinates = dmlEntry.Coordinates + mgtr.applier.CurrentCoordinates = coords mgtr.applier.CurrentCoordinatesMutex.Unlock() return nil } + if err := base.SendWithContext( + mgtr.migrationContext.GetContext(), + mgtr.applyEventsQueue, + newApplyEventStructByFunc(&writeFunc), + ); err != nil { + return mgtr.migrationContext.Log.Errore(err) + } + return nil } // abort stores the error, cancels the context, and logs the abort. diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 8fc48e326..95278fc3d 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -194,6 +194,81 @@ func TestMigratorOnChangelogEvent(t *testing.T) { }) } +// Regression: heartbeats must not advance applier.CurrentCoordinates past +// DMLs still sitting in applyEventsQueue. If they do, checkpointLoop will +// persist a GTID set that includes un-applied transactions, and resume via +// StartSyncGTID will skip them (the server treats them as already-seen). +func TestMigratorHeartbeatDoesNotAdvancePastUnappliedDML(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.UseGTIDs = true + migrator := NewMigrator(migrationContext, "test") + migrator.applier = NewApplier(migrationContext) + + const srcUUID = "00000000-0000-0000-0000-000000000001" + + // A DML on the original table at GTID :100 is observed and enqueued, but + // not yet applied. + dmlCoords, err := mysql.NewGTIDBinlogCoordinates(srcUUID + ":1-100") + require.NoError(t, err) + migrator.applyEventsQueue <- newApplyEventStructByDML(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + TableName: migrationContext.OriginalTableName, + DML: binlog.UpdateDML, + }, + Coordinates: dmlCoords, + }) + require.Equal(t, 1, len(migrator.applyEventsQueue), + "DML must be sitting un-applied in the queue") + + // A heartbeat row is then written; its GTID set includes the un-applied + // DML plus a few additional transactions. + heartbeatCoords, err := mysql.NewGTIDBinlogCoordinates(srcUUID + ":1-105") + require.NoError(t, err) + heartbeatColumnValues := sql.ToColumnValues([]interface{}{ + 123, + time.Now().Unix(), + "heartbeat", + time.Now().Format(time.RFC3339Nano), + }) + require.NoError(t, migrator.onChangelogHeartbeatEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: heartbeatColumnValues, + }, + Coordinates: heartbeatCoords, + })) + + // The DML is still un-applied; the heartbeat's coords-bump sentinel has + // been enqueued behind it. + require.Equal(t, 2, len(migrator.applyEventsQueue), + "queue must hold the un-applied DML and the heartbeat sentinel; "+ + "this test does not drain the queue") + + // Invariant: CurrentCoordinates must NOT have advanced past the queued DML. + currentCoords := migrator.applier.CurrentCoordinates + require.False(t, currentCoords != nil && dmlCoords.SmallerThanOrEquals(currentCoords), + "CurrentCoordinates must not cover the un-applied DML at %s (got %v)", + dmlCoords.DisplayString(), currentCoords) + + // Consequence: the checkpoint gate in Migrator.Checkpoint must NOT fire + // for streamer coords that include the un-applied DML. + require.False(t, currentCoords != nil && heartbeatCoords.SmallerThanOrEquals(currentCoords), + "checkpoint gate must not fire while DML at %s is un-applied", + dmlCoords.DisplayString()) + + // Ordering: the DML must come first, then the heartbeat sentinel. If a + // future change ever wraps the heartbeat enqueue in `go func()`, this + // invariant breaks and the bug returns. + firstQueued := <-migrator.applyEventsQueue + secondQueued := <-migrator.applyEventsQueue + require.NotNil(t, firstQueued.dmlEvent, "first queued event must be the DML") + require.Nil(t, firstQueued.writeFunc, "first queued event must not be a sentinel") + require.Nil(t, secondQueued.dmlEvent, "second queued event must not be a DML") + require.NotNil(t, secondQueued.writeFunc, "second queued event must be the heartbeat sentinel") +} + func TestMigratorValidateStatement(t *testing.T) { t.Run("add-column", func(t *testing.T) { migrationContext := base.NewMigrationContext()