From f3985dee4d7d7aed0b4a5c0b1e452c02a31a8754 Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Wed, 20 May 2026 10:42:34 +0200 Subject: [PATCH 1/3] Fix resume data loss: route heartbeat coords through applyEventsQueue onChangelogHeartbeatEvent was mutating applier.CurrentCoordinates directly from the streamer goroutine, before any DML that preceded the heartbeat was applied to the ghost table. The checkpoint loop reads CurrentCoordinates as "applied through this GTID" and could persist a checkpoint whose LastTrxCoords was ahead of what was actually applied. If gh-ost crashed before applyEventsQueue drained, --resume read that checkpoint and called StartSyncGTID with the persisted set; MySQL treated the un-applied GTIDs as already-seen and never re-streamed them. The ghost table silently lost those DMLs and cut-over produced a stale table. Fix: enqueue a tableWriteFunc onto applyEventsQueue that performs the coords bump. The apply goroutine executes it in order, after the DMLs the streamer enqueued before the heartbeat, restoring the invariant. Adds TestMigratorHeartbeatDoesNotAdvancePastUnappliedDML, which fails at the previous HEAD and passes after the fix; also asserts queue ordering to guard against future changes that wrap the heartbeat enqueue in a goroutine. Co-authored-by: Bastian Bartmann --- go/logic/migrator.go | 23 ++++++++++-- go/logic/migrator_test.go | 75 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 3 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index a1a78d1e3..425f0a385 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -326,13 +326,30 @@ func (mgtr *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (e heartbeatTime, err := time.Parse(time.RFC3339Nano, changelogHeartbeatString) if err != nil { return mgtr.migrationContext.Log.Errore(err) - } else { - mgtr.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime) + } + mgtr.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime) + + // Route the coords bump through applyEventsQueue so it is ordered after + // any DMLs the streamer enqueued before this heartbeat. + // + // If we instead mutated applier.CurrentCoordinates directly from this + // streamer goroutine, the checkpoint loop (Migrator.Checkpoint) could + // observe coords that include DMLs still sitting un-applied in + // applyEventsQueue and write a checkpoint row whose LastTrxCoords is + // AHEAD of what has actually been applied to the ghost table. If gh-ost + // then crashes before the queue drains, resume reads that checkpoint and + // calls StartSyncGTID with the persisted set; MySQL treats the un-applied + // GTIDs as already-seen and never re-streams them, so the ghost table + // silently loses those DMLs and cut-over produces a stale table. + coords := dmlEntry.Coordinates + var writeFunc tableWriteFunc = func() error { mgtr.applier.CurrentCoordinatesMutex.Lock() - mgtr.applier.CurrentCoordinates = dmlEntry.Coordinates + mgtr.applier.CurrentCoordinates = coords mgtr.applier.CurrentCoordinatesMutex.Unlock() return nil } + mgtr.applyEventsQueue <- newApplyEventStructByFunc(&writeFunc) + return nil } // abort stores the error, cancels the context, and logs the abort. diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 8fc48e326..95278fc3d 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -194,6 +194,81 @@ func TestMigratorOnChangelogEvent(t *testing.T) { }) } +// Regression: heartbeats must not advance applier.CurrentCoordinates past +// DMLs still sitting in applyEventsQueue. If they do, checkpointLoop will +// persist a GTID set that includes un-applied transactions, and resume via +// StartSyncGTID will skip them (the server treats them as already-seen). +func TestMigratorHeartbeatDoesNotAdvancePastUnappliedDML(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.UseGTIDs = true + migrator := NewMigrator(migrationContext, "test") + migrator.applier = NewApplier(migrationContext) + + const srcUUID = "00000000-0000-0000-0000-000000000001" + + // A DML on the original table at GTID :100 is observed and enqueued, but + // not yet applied. + dmlCoords, err := mysql.NewGTIDBinlogCoordinates(srcUUID + ":1-100") + require.NoError(t, err) + migrator.applyEventsQueue <- newApplyEventStructByDML(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + TableName: migrationContext.OriginalTableName, + DML: binlog.UpdateDML, + }, + Coordinates: dmlCoords, + }) + require.Equal(t, 1, len(migrator.applyEventsQueue), + "DML must be sitting un-applied in the queue") + + // A heartbeat row is then written; its GTID set includes the un-applied + // DML plus a few additional transactions. + heartbeatCoords, err := mysql.NewGTIDBinlogCoordinates(srcUUID + ":1-105") + require.NoError(t, err) + heartbeatColumnValues := sql.ToColumnValues([]interface{}{ + 123, + time.Now().Unix(), + "heartbeat", + time.Now().Format(time.RFC3339Nano), + }) + require.NoError(t, migrator.onChangelogHeartbeatEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: heartbeatColumnValues, + }, + Coordinates: heartbeatCoords, + })) + + // The DML is still un-applied; the heartbeat's coords-bump sentinel has + // been enqueued behind it. + require.Equal(t, 2, len(migrator.applyEventsQueue), + "queue must hold the un-applied DML and the heartbeat sentinel; "+ + "this test does not drain the queue") + + // Invariant: CurrentCoordinates must NOT have advanced past the queued DML. + currentCoords := migrator.applier.CurrentCoordinates + require.False(t, currentCoords != nil && dmlCoords.SmallerThanOrEquals(currentCoords), + "CurrentCoordinates must not cover the un-applied DML at %s (got %v)", + dmlCoords.DisplayString(), currentCoords) + + // Consequence: the checkpoint gate in Migrator.Checkpoint must NOT fire + // for streamer coords that include the un-applied DML. + require.False(t, currentCoords != nil && heartbeatCoords.SmallerThanOrEquals(currentCoords), + "checkpoint gate must not fire while DML at %s is un-applied", + dmlCoords.DisplayString()) + + // Ordering: the DML must come first, then the heartbeat sentinel. If a + // future change ever wraps the heartbeat enqueue in `go func()`, this + // invariant breaks and the bug returns. + firstQueued := <-migrator.applyEventsQueue + secondQueued := <-migrator.applyEventsQueue + require.NotNil(t, firstQueued.dmlEvent, "first queued event must be the DML") + require.Nil(t, firstQueued.writeFunc, "first queued event must not be a sentinel") + require.Nil(t, secondQueued.dmlEvent, "second queued event must not be a DML") + require.NotNil(t, secondQueued.writeFunc, "second queued event must be the heartbeat sentinel") +} + func TestMigratorValidateStatement(t *testing.T) { t.Run("add-column", func(t *testing.T) { migrationContext := base.NewMigrationContext() From 5d1ede703857d6c8fa96924c9020bb944bae2d93 Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Wed, 20 May 2026 11:05:42 +0200 Subject: [PATCH 2/3] Clean up comment Co-authored-by: Bastian Bartmann --- go/logic/migrator.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 425f0a385..dbc7fc3c8 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -331,16 +331,6 @@ func (mgtr *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (e // Route the coords bump through applyEventsQueue so it is ordered after // any DMLs the streamer enqueued before this heartbeat. - // - // If we instead mutated applier.CurrentCoordinates directly from this - // streamer goroutine, the checkpoint loop (Migrator.Checkpoint) could - // observe coords that include DMLs still sitting un-applied in - // applyEventsQueue and write a checkpoint row whose LastTrxCoords is - // AHEAD of what has actually been applied to the ghost table. If gh-ost - // then crashes before the queue drains, resume reads that checkpoint and - // calls StartSyncGTID with the persisted set; MySQL treats the un-applied - // GTIDs as already-seen and never re-streams them, so the ghost table - // silently loses those DMLs and cut-over produces a stale table. coords := dmlEntry.Coordinates var writeFunc tableWriteFunc = func() error { mgtr.applier.CurrentCoordinatesMutex.Lock() From d96ad02f075eca1fe9c7d4a19026f43001d90143 Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Thu, 21 May 2026 10:44:25 +0200 Subject: [PATCH 3/3] Replace direct channel write with SendWithContext Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- go/logic/migrator.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index dbc7fc3c8..5dba9e7ed 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -338,7 +338,13 @@ func (mgtr *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (e mgtr.applier.CurrentCoordinatesMutex.Unlock() return nil } - mgtr.applyEventsQueue <- newApplyEventStructByFunc(&writeFunc) + if err := base.SendWithContext( + mgtr.migrationContext.GetContext(), + mgtr.applyEventsQueue, + newApplyEventStructByFunc(&writeFunc), + ); err != nil { + return mgtr.migrationContext.Log.Errore(err) + } return nil }