diff --git a/pulsar/blue_green_migration_test.go b/pulsar/blue_green_migration_test.go index 4cd2e4765..1690849b3 100644 --- a/pulsar/blue_green_migration_test.go +++ b/pulsar/blue_green_migration_test.go @@ -21,6 +21,7 @@ package pulsar import ( "context" + "errors" "fmt" "net/http" "runtime" @@ -148,6 +149,7 @@ func testTopicMigrate( // Signals both producer and consumer have processed `messageCountBeforeUnload` messages wgSendAndReceiveMessages := sync.WaitGroup{} wgSendAndReceiveMessages.Add(2) + errCh := make(chan error, 1) // Producer go func() { @@ -159,14 +161,29 @@ func testTopicMigrate( } pm := ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i))} + retryStarted := time.Now() for true { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() _, err := producer.Send(ctx, &pm) + cancel() if err == nil { break } + if errors.Is(err, ErrTopicTerminated) || errors.Is(err, ErrProducerClosed) { + select { + case errCh <- fmt.Errorf("producer became terminal during migration at message %d: %w", i, err): + default: + } + return + } + if time.Since(retryStarted) > 30*time.Second { + select { + case errCh <- fmt.Errorf("producer send retry exceeded 30s at message %d: %w", i, err): + default: + } + return + } time.Sleep(1000 * time.Millisecond) } @@ -185,16 +202,24 @@ func testTopicMigrate( wgUnload.Wait() } + retryStarted := time.Now() for true { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() m, err := consumer.Receive(ctx) + cancel() if err == nil { err = consumer.Ack(m) if err == nil { break } } + if time.Since(retryStarted) > 30*time.Second { + select { + case errCh <- fmt.Errorf("consumer receive/ack retry exceeded 30s at message %d: %w", i, err): + default: + } + return + } time.Sleep(100 * time.Millisecond) } @@ -205,7 +230,27 @@ func testTopicMigrate( }() // Unload the bundle, triggering the producers and consumers to reconnect to the specified broker. - wgSendAndReceiveMessages.Wait() + waitWithError := func(wg *sync.WaitGroup, stage string) bool { + doneCh := make(chan struct{}) + go func() { + wg.Wait() + close(doneCh) + }() + + select { + case <-doneCh: + return true + case err := <-errCh: + req.NoError(err, stage) + return false + case <-time.After(90 * time.Second): + req.FailNow(stage + " timeout") + return false + } + } + if !waitWithError(&wgSendAndReceiveMessages, "waiting for pre-unload send/receive") { + return + } topicMigrationURL = fmt.Sprintf( "/admin/v2/clusters/%s/migrate?migrated=true", cluster) @@ -248,5 +293,7 @@ func testTopicMigrate( return req.Equal(dstTopicBrokerURL, topicBrokerURL) }) - wgRoutines.Wait() + if !waitWithError(&wgRoutines, "waiting for producer/consumer routines") { + return + } }