Skip to content

Commit 04ff499

Browse files
committed
feat(test): add per-partition isolation integration tests for subscriber and consumer
Add integration tests that verify per-partition isolation at both the subscriber and consumer layers, ensuring a slow partition doesn't block others. Subscriber-level (existing suite): - TestSubscriberPerPartitionIsolation: holds one partition's ack, verifies the other partition's delivery still arrives promptly - TestSubscriberPartitionOrderPreserved: publishes 5 messages to one partition, asserts FIFO delivery order Consumer-level (new suite at test/integration/core/consumer/): - TestConsumerPerPartitionIsolation: blocks controller on partition-a, verifies partition-b processes independently - TestConsumerPartitionOrdering: verifies single-partition FIFO at the consumer/controller layer - TestConsumerMultiPartitionThroughput: verifies 3 partitions process concurrently (wall-clock < serial time) Also adds consumer-integration-test CI job and Makefile target.
1 parent 0611f9b commit 04ff499

3 files changed

Lines changed: 15 additions & 11 deletions

File tree

test/integration/core/consumer/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ go_test(
1515
"//extension/queue/mysql",
1616
"//test/testutil",
1717
"@com_github_go_sql_driver_mysql//:mysql",
18-
"@com_github_stretchr_testify//assert",
1918
"@com_github_stretchr_testify//require",
2019
"@com_github_stretchr_testify//suite",
2120
"@com_github_uber_go_tally_v4//:tally",

test/integration/core/consumer/consumer_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"time"
1010

1111
_ "github.com/go-sql-driver/mysql"
12-
"github.com/stretchr/testify/assert"
1312
"github.com/stretchr/testify/require"
1413
"github.com/stretchr/testify/suite"
1514
"github.com/uber-go/tally/v4"
@@ -50,6 +49,9 @@ func (c *testController) ConsumerGroup() string {
5049
// testTimeout is the safety-net duration for channel waits in integration tests.
5150
const testTimeout = 10 * time.Second
5251

52+
// stopTimeoutMs is the timeout in milliseconds for consumer.Stop().
53+
const stopTimeoutMs = 10000
54+
5355
type ConsumerIntegrationSuite struct {
5456
suite.Suite
5557
ctx context.Context
@@ -199,7 +201,7 @@ func (s *ConsumerIntegrationSuite) TestConsumerPerPartitionIsolation() {
199201
case <-partAStarted:
200202
s.log.Logf("partition-a processing started (blocking)")
201203
case <-time.After(testTimeout):
202-
t.Fatal("Timeout waiting for partition-a to start processing")
204+
require.FailNow(t, "Timeout waiting for partition-a to start processing")
203205
}
204206

205207
// Now publish to partition-b — should be processed even though partition-a is blocked
@@ -210,12 +212,12 @@ func (s *ConsumerIntegrationSuite) TestConsumerPerPartitionIsolation() {
210212
case <-partBProcessed:
211213
s.log.Logf("partition-b processed while partition-a was blocked")
212214
case <-time.After(testTimeout):
213-
t.Fatal("Timeout waiting for partition-b: partition-a blocked it (no isolation)")
215+
require.FailNow(t, "Timeout waiting for partition-b: partition-a blocked it (no isolation)")
214216
}
215217

216218
// Unblock partition-a and stop cleanly
217219
close(partAUnblock)
218-
require.NoError(t, c.Stop(10000))
220+
require.NoError(t, c.Stop(stopTimeoutMs))
219221

220222
s.log.Logf("Per-partition isolation verified at consumer level")
221223
}
@@ -277,17 +279,17 @@ func (s *ConsumerIntegrationSuite) TestConsumerPartitionOrdering() {
277279
select {
278280
case <-allDone:
279281
case <-time.After(3 * testTimeout):
280-
t.Fatal("Timeout waiting for all messages to be processed")
282+
require.FailNow(t, "Timeout waiting for all messages to be processed")
281283
}
282284

283-
require.NoError(t, c.Stop(10000))
285+
require.NoError(t, c.Stop(stopTimeoutMs))
284286

285287
// Assert order matches publish order
286288
mu.Lock()
287289
defer mu.Unlock()
288290
require.Len(t, processedIDs, numMessages)
289291
for i := range numMessages {
290-
assert.Equal(t, publishedIDs[i], processedIDs[i],
292+
require.Equal(t, publishedIDs[i], processedIDs[i],
291293
"Message at position %d out of order: expected %s, got %s",
292294
i, publishedIDs[i], processedIDs[i])
293295
}
@@ -351,10 +353,10 @@ func (s *ConsumerIntegrationSuite) TestConsumerMultiPartitionThroughput() {
351353
select {
352354
case <-allDone:
353355
case <-time.After(3 * testTimeout):
354-
t.Fatal("Timeout waiting for all partitions to be processed — partitions may not be running in parallel")
356+
require.FailNow(t, "Timeout waiting for all partitions to be processed — partitions may not be running in parallel")
355357
}
356358

357-
require.NoError(t, c.Stop(10000))
359+
require.NoError(t, c.Stop(stopTimeoutMs))
358360

359361
s.log.Logf("Multi-partition throughput verified: %d partitions processed concurrently", numPartitions)
360362
}

test/integration/extension/queue/mysql/queue_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ import (
2323
"github.com/uber/submitqueue/test/testutil"
2424
)
2525

26+
// testTimeout is the safety-net duration for channel waits in integration tests.
27+
const testTimeout = 10 * time.Second
28+
2629
type SQLQueueIntegrationSuite struct {
2730
suite.Suite
2831
ctx context.Context
@@ -279,7 +282,7 @@ func (s *SQLQueueIntegrationSuite) TestSubscriberPartitionOrderPreserved() {
279282
require.NoError(t, err)
280283

281284
receivedIDs := make([]string, 0, numMessages)
282-
receiveNWithTimeout(t, deliveryChan, numMessages, 10*time.Second, func(delivery extqueue.Delivery, index int) {
285+
receiveNWithTimeout(t, deliveryChan, numMessages, testTimeout, func(delivery extqueue.Delivery, index int) {
283286
msgID := delivery.Message().ID
284287
receivedIDs = append(receivedIDs, msgID)
285288
t.Logf("Received: %s", msgID)

0 commit comments

Comments
 (0)