diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 4e232b8ab8..4e2cbc4a92 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -296,27 +296,33 @@ func TestPriorityConsumer(t *testing.T) { assert.Nil(t, err) defer producer.Close() - for i := 0; i < 10; i++ { + // Phase 1: Send 15 messages — distributed among the three priority-1 consumers + for i := 0; i < 15; i++ { _, err := producer.Send(context.Background(), &ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), }) assert.Nil(t, err) } - // Drain permits from consumer1 and consumer2 - for i := 0; i < 5; i++ { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - msg, err := consumer1.Receive(ctx) - cancel() - assert.Nil(t, err) - assert.NotNil(t, msg) + // Phase 2: Drain 20 messages from consumer1 and consumer2 to replenish their permits. + // After receiving, each consumer sends individual permits back to the broker, + // so consumer1 and consumer2 will have more permits than consumer3. + for i := 0; i < 20; i++ { + ctx1, cancel1 := context.WithTimeout(context.Background(), 2*time.Second) + _, _ = consumer1.Receive(ctx1) + cancel1() + ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second) + _, _ = consumer2.Receive(ctx2) + cancel2() } + + // Phase 3: Send 5 more messages — broker should dispatch only to consumer1/consumer2 + // because they have more available permits at priority level 1. for i := 0; i < 5; i++ { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - msg, err := consumer2.Receive(ctx) - cancel() + _, err := producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-extra-%d", i)), + }) assert.Nil(t, err) - assert.NotNil(t, msg) } // Low-priority consumer should not have received any messages