Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions pulsar/blue_green_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package pulsar

import (
"context"
"errors"
"fmt"
"net/http"
"runtime"
Expand Down Expand Up @@ -148,6 +149,7 @@ func testTopicMigrate(
// Signals both producer and consumer have processed `messageCountBeforeUnload` messages
wgSendAndReceiveMessages := sync.WaitGroup{}
wgSendAndReceiveMessages.Add(2)
errCh := make(chan error, 1)

// Producer
go func() {
Expand All @@ -159,14 +161,29 @@ func testTopicMigrate(
}

pm := ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i))}
retryStarted := time.Now()

for true {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_, err := producer.Send(ctx, &pm)
cancel()
if err == nil {
break
}
if errors.Is(err, ErrTopicTerminated) || errors.Is(err, ErrProducerClosed) {
select {
case errCh <- fmt.Errorf("producer became terminal during migration at message %d: %w", i, err):
default:
}
return
}
if time.Since(retryStarted) > 30*time.Second {
select {
case errCh <- fmt.Errorf("producer send retry exceeded 30s at message %d: %w", i, err):
default:
}
return
}
time.Sleep(1000 * time.Millisecond)
}

Expand All @@ -185,16 +202,24 @@ func testTopicMigrate(
wgUnload.Wait()
}

retryStarted := time.Now()
for true {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
m, err := consumer.Receive(ctx)
cancel()
if err == nil {
err = consumer.Ack(m)
if err == nil {
break
}
}
if time.Since(retryStarted) > 30*time.Second {
select {
case errCh <- fmt.Errorf("consumer receive/ack retry exceeded 30s at message %d: %w", i, err):
default:
}
return
}
time.Sleep(100 * time.Millisecond)
}

Expand All @@ -205,7 +230,27 @@ func testTopicMigrate(
}()

// Unload the bundle, triggering the producers and consumers to reconnect to the specified broker.
wgSendAndReceiveMessages.Wait()
waitWithError := func(wg *sync.WaitGroup, stage string) bool {
doneCh := make(chan struct{})
go func() {
wg.Wait()
close(doneCh)
}()

select {
case <-doneCh:
return true
case err := <-errCh:
req.NoError(err, stage)
return false
case <-time.After(90 * time.Second):
req.FailNow(stage + " timeout")
return false
}
}
if !waitWithError(&wgSendAndReceiveMessages, "waiting for pre-unload send/receive") {
return
}

topicMigrationURL = fmt.Sprintf(
"/admin/v2/clusters/%s/migrate?migrated=true", cluster)
Expand Down Expand Up @@ -248,5 +293,7 @@ func testTopicMigrate(
return req.Equal(dstTopicBrokerURL, topicBrokerURL)
})

wgRoutines.Wait()
if !waitWithError(&wgRoutines, "waiting for producer/consumer routines") {
return
}
}
Loading