From edd106f4abfa75efb3bcd8eaa4477150f8c643aa Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 20 May 2026 15:43:06 +0800 Subject: [PATCH 1/2] fix: avoid context leak in blue-green migration test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pulsar/blue_green_migration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/blue_green_migration_test.go b/pulsar/blue_green_migration_test.go index 4cd2e4765c..2a4b4174e8 100644 --- a/pulsar/blue_green_migration_test.go +++ b/pulsar/blue_green_migration_test.go @@ -162,8 +162,8 @@ func testTopicMigrate( for true { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() _, err := producer.Send(ctx, &pm) + cancel() if err == nil { break } @@ -187,8 +187,8 @@ func testTopicMigrate( for true { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() m, err := consumer.Receive(ctx) + cancel() if err == nil { err = consumer.Ack(m) if err == nil { From bc05b7aae1f7dfb5e4cf255e822413af4ced1f32 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 20 May 2026 15:47:13 +0800 Subject: [PATCH 2/2] fix: avoid infinite retry in blue-green migration test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- pulsar/blue_green_migration_test.go | 51 +++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/pulsar/blue_green_migration_test.go b/pulsar/blue_green_migration_test.go index 2a4b4174e8..1690849b36 100644 --- a/pulsar/blue_green_migration_test.go +++ b/pulsar/blue_green_migration_test.go @@ -21,6 +21,7 @@ package pulsar import ( "context" + "errors" "fmt" "net/http" "runtime" @@ -148,6 +149,7 @@ func testTopicMigrate( // Signals both producer and consumer have processed `messageCountBeforeUnload` messages wgSendAndReceiveMessages := sync.WaitGroup{} wgSendAndReceiveMessages.Add(2) + errCh := make(chan error, 1) // Producer go func() { @@ -159,6 +161,7 @@ func testTopicMigrate( } pm := ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i))} + retryStarted := time.Now() for true { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) @@ -167,6 +170,20 @@ func testTopicMigrate( if err == nil { break } + if errors.Is(err, ErrTopicTerminated) || errors.Is(err, ErrProducerClosed) { + select { + case errCh <- fmt.Errorf("producer became terminal during migration at message %d: %w", i, err): + default: + } + return + } + if time.Since(retryStarted) > 30*time.Second { + select { + case errCh <- fmt.Errorf("producer send retry exceeded 30s at message %d: %w", i, err): + default: + } + return + } time.Sleep(1000 * time.Millisecond) } @@ -185,6 +202,7 @@ func testTopicMigrate( wgUnload.Wait() } + retryStarted := time.Now() for true { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) m, err := consumer.Receive(ctx) @@ -195,6 +213,13 @@ func testTopicMigrate( break } } + if time.Since(retryStarted) > 30*time.Second { + select { + case errCh <- fmt.Errorf("consumer receive/ack retry exceeded 30s at message %d: %w", i, err): + default: + } + return + } time.Sleep(100 * time.Millisecond) } @@ -205,7 +230,27 @@ func testTopicMigrate( }() // Unload the bundle, triggering the producers and consumers to reconnect to the specified broker. - wgSendAndReceiveMessages.Wait() + waitWithError := func(wg *sync.WaitGroup, stage string) bool { + doneCh := make(chan struct{}) + go func() { + wg.Wait() + close(doneCh) + }() + + select { + case <-doneCh: + return true + case err := <-errCh: + req.NoError(err, stage) + return false + case <-time.After(90 * time.Second): + req.FailNow(stage + " timeout") + return false + } + } + if !waitWithError(&wgSendAndReceiveMessages, "waiting for pre-unload send/receive") { + return + } topicMigrationURL = fmt.Sprintf( "/admin/v2/clusters/%s/migrate?migrated=true", cluster) @@ -248,5 +293,7 @@ func testTopicMigrate( return req.Equal(dstTopicBrokerURL, topicBrokerURL) }) - wgRoutines.Wait() + if !waitWithError(&wgRoutines, "waiting for producer/consumer routines") { + return + } }