From f100865215e14860ebe11333b4ae65b17d2f41e5 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Fri, 6 Mar 2026 12:14:14 -0800 Subject: [PATCH] feat(test): add per-partition isolation integration tests for subscriber and consumer --- .github/workflows/ci.yml | 14 + Makefile | 6 +- test/integration/core/consumer/BUILD.bazel | 23 ++ .../core/consumer/consumer_test.go | 362 ++++++++++++++++++ .../core/consumer/docker-compose.yml | 20 + .../extension/queue/mysql/queue_test.go | 110 ++++++ 6 files changed, 534 insertions(+), 1 deletion(-) create mode 100644 test/integration/core/consumer/BUILD.bazel create mode 100644 test/integration/core/consumer/consumer_test.go create mode 100644 test/integration/core/consumer/docker-compose.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 91531233..4af1e238 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -113,6 +113,19 @@ jobs: with: target: //test/integration/extension/storage/... + # --------------------------------------------------------------------------- + # CORE TESTS + # --------------------------------------------------------------------------- + consumer-integration-test: + name: Consumer Integration Test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/setup + - uses: ./.github/actions/run-bazel-test + with: + target: //test/integration/core/consumer/... + # --------------------------------------------------------------------------- # REQUIRED CHECKS GATE # --------------------------------------------------------------------------- @@ -127,6 +140,7 @@ jobs: - counter-integration-test - queue-integration-test - storage-integration-test + - consumer-integration-test steps: - name: All required checks passed run: | diff --git a/Makefile b/Makefile index 143bf5bf..77ad1f94 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ LOCAL_PROJECT = submitqueue # Set REPO_ROOT for docker-compose export REPO_ROOT := $(shell pwd) -.PHONY: build build-all-linux build-gateway-linux build-orchestrator-linux clean clean-proto deps e2e-test gazelle integration-test integration-test-extensions integration-test-gateway integration-test-orchestrator license-fix lint lint-license local-clean local-gateway-start local-gateway-stop local-init-schemas local-logs local-orchestrator-start local-orchestrator-stop local-ps local-restart local-start local-stop proto query-deps query-targets run-client-gateway run-client-orchestrator run-queue-admin test test-no-cache help +.PHONY: build build-all-linux build-gateway-linux build-orchestrator-linux clean clean-proto deps e2e-test gazelle integration-test integration-test-consumer integration-test-extensions integration-test-gateway integration-test-orchestrator license-fix lint lint-license local-clean local-gateway-start local-gateway-stop local-init-schemas local-logs local-orchestrator-start local-orchestrator-stop local-ps local-restart local-start local-stop proto query-deps query-targets run-client-gateway run-client-orchestrator run-queue-admin test test-no-cache help build: ## Build all services and examples @@ -71,6 +71,10 @@ integration-test: build-all-linux ## Run all integration tests (auto-builds bina @echo "Running all integration tests..." @$(BAZEL) test //test/integration/... --test_output=streamed +integration-test-consumer: ## Run Consumer integration tests + @echo "Running Consumer integration tests..." + @$(BAZEL) test //test/integration/core/consumer:consumer_test --test_output=streamed + integration-test-extensions: ## Run extension integration tests @echo "Running extension integration tests..." @$(BAZEL) test //test/integration/extension/... --test_output=streamed diff --git a/test/integration/core/consumer/BUILD.bazel b/test/integration/core/consumer/BUILD.bazel new file mode 100644 index 00000000..3eafea20 --- /dev/null +++ b/test/integration/core/consumer/BUILD.bazel @@ -0,0 +1,23 @@ +load("@rules_go//go:def.bzl", "go_test") + +go_test( + name = "consumer_test", + srcs = ["consumer_test.go"], + data = [ + "docker-compose.yml", + "//extension/queue/mysql/schema", + ], + tags = ["integration"], + deps = [ + "//core/consumer", + "//entity/queue", + "//extension/queue", + "//extension/queue/mysql", + "//test/testutil", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_stretchr_testify//require", + "@com_github_stretchr_testify//suite", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/test/integration/core/consumer/consumer_test.go b/test/integration/core/consumer/consumer_test.go new file mode 100644 index 00000000..828b1150 --- /dev/null +++ b/test/integration/core/consumer/consumer_test.go @@ -0,0 +1,362 @@ +package consumer + +import ( + "context" + "database/sql" + "fmt" + "sync" + "testing" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/uber-go/tally/v4" + "go.uber.org/zap/zaptest" + + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity/queue" + extqueue "github.com/uber/submitqueue/extension/queue" + queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" + "github.com/uber/submitqueue/test/testutil" +) + +// testController implements consumer.Controller for integration tests. +// Each test configures the processFunc to control behavior per partition. +type testController struct { + name string + topicKey consumer.TopicKey + consumerGroup string + processFunc func(ctx context.Context, delivery consumer.Delivery) error +} + +func (c *testController) Process(ctx context.Context, delivery consumer.Delivery) error { + return c.processFunc(ctx, delivery) +} + +func (c *testController) Name() string { + return c.name +} + +func (c *testController) TopicKey() consumer.TopicKey { + return c.topicKey +} + +func (c *testController) ConsumerGroup() string { + return c.consumerGroup +} + +// testTimeout is the safety-net duration for channel waits in integration tests. +const testTimeout = 10 * time.Second + +// stopTimeoutMs is the timeout in milliseconds for consumer.Stop(). +const stopTimeoutMs = 10000 + +type ConsumerIntegrationSuite struct { + suite.Suite + ctx context.Context + stack *testutil.ComposeStack + db *sql.DB + log *testutil.TestLogger +} + +func TestConsumerIntegration(t *testing.T) { + suite.Run(t, new(ConsumerIntegrationSuite)) +} + +func (s *ConsumerIntegrationSuite) SetupSuite() { + t := s.T() + s.ctx = context.Background() + s.log = testutil.NewTestLogger(t) + + s.log.Logf("Starting Consumer integration test suite using docker-compose") + + s.stack = testutil.NewComposeStack( + t, + s.log, + s.ctx, + "docker-compose.yml", + "core-consumer", + ) + + err := s.stack.Up() + require.NoError(t, err, "failed to start compose stack") + + s.db, err = s.stack.ConnectMySQLService("mysql") + require.NoError(t, err, "failed to connect to MySQL") + + schemaDir := testutil.SchemaDir("extension/queue/mysql/schema") + testutil.ApplySchema(t, s.log, s.db, schemaDir) + + t.Cleanup(func() { + if s.db != nil { + s.db.Close() + } + }) + + s.log.Logf("Consumer integration test suite ready") +} + +func (s *ConsumerIntegrationSuite) TearDownSuite() { + s.log.Logf("Tearing down Consumer integration test suite") +} + +// newQueue creates a new MySQL-backed queue for testing. +func (s *ConsumerIntegrationSuite) newQueue(t *testing.T) extqueue.Queue { + t.Helper() + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, + Logger: zaptest.NewLogger(t), + MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + return q +} + +// newConsumer creates a consumer with a TopicRegistry wired to the given queue and topic. +func (s *ConsumerIntegrationSuite) newConsumer(t *testing.T, q extqueue.Queue, topicKey consumer.TopicKey, topicName string, consumerGroup string) consumer.Consumer { + t.Helper() + logger := zaptest.NewLogger(t).Sugar() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + { + Key: topicKey, + Name: topicName, + Queue: q, + Subscription: extqueue.SubscriptionConfig{ + SubscriberName: "test-worker", + ConsumerGroup: consumerGroup, + PollIntervalMs: 100, + BatchSize: 10, + VisibilityTimeoutMs: 60000, + LeaseRenewalIntervalMs: 2000, + LeaseDurationMs: 5000, + Retry: extqueue.RetryConfig{ + MaxAttempts: 3, + InitialBackoffMs: 1000, + MaxBackoffMs: 30000, + BackoffMultiplier: 2.0, + }, + DLQ: extqueue.DLQConfig{ + Enabled: true, + TopicSuffix: "_dlq", + }, + }, + }, + }) + require.NoError(t, err) + + return consumer.New(logger, tally.NoopScope, registry) +} + +func (s *ConsumerIntegrationSuite) TestConsumerPerPartitionIsolation() { + t := s.T() + + topicKey := consumer.TopicKey("isolation-test") + topicName := "consumer-isolation-topic" + consumerGroup := "isolation-group" + + q := s.newQueue(t) + defer q.Close() + + publisher := q.Publisher() + + // Channels for synchronizing the test with the controller + partAStarted := make(chan struct{}) // signals partition-a processing began + partAUnblock := make(chan struct{}) // unblocks partition-a processing + partBProcessed := make(chan struct{}) // signals partition-b was processed + + ctrl := &testController{ + name: "isolation-controller", + topicKey: topicKey, + consumerGroup: consumerGroup, + processFunc: func(ctx context.Context, delivery consumer.Delivery) error { + partition := delivery.Message().PartitionKey + t.Logf("Controller processing: partition=%s id=%s", partition, delivery.Message().ID) + + if partition == "partition-a" { + close(partAStarted) + // Block until test unblocks us + select { + case <-partAUnblock: + case <-ctx.Done(): + return ctx.Err() + } + } else if partition == "partition-b" { + close(partBProcessed) + } + return nil + }, + } + + c := s.newConsumer(t, q, topicKey, topicName, consumerGroup) + require.NoError(t, c.Register(ctrl)) + require.NoError(t, c.Start(s.ctx)) + + // Publish to partition-a, wait for it to start blocking + msgA := queue.NewMessage("iso-a", []byte("data-a"), "partition-a", nil) + require.NoError(t, publisher.Publish(s.ctx, topicName, msgA)) + + select { + case <-partAStarted: + s.log.Logf("partition-a processing started (blocking)") + case <-time.After(testTimeout): + require.FailNow(t, "Timeout waiting for partition-a to start processing") + } + + // Now publish to partition-b — should be processed even though partition-a is blocked + msgB := queue.NewMessage("iso-b", []byte("data-b"), "partition-b", nil) + require.NoError(t, publisher.Publish(s.ctx, topicName, msgB)) + + select { + case <-partBProcessed: + s.log.Logf("partition-b processed while partition-a was blocked") + case <-time.After(testTimeout): + require.FailNow(t, "Timeout waiting for partition-b: partition-a blocked it (no isolation)") + } + + // Unblock partition-a and stop cleanly + close(partAUnblock) + require.NoError(t, c.Stop(stopTimeoutMs)) + + s.log.Logf("Per-partition isolation verified at consumer level") +} + +func (s *ConsumerIntegrationSuite) TestConsumerPartitionOrdering() { + t := s.T() + + topicKey := consumer.TopicKey("ordering-test") + topicName := "consumer-ordering-topic" + consumerGroup := "ordering-group" + + q := s.newQueue(t) + defer q.Close() + + publisher := q.Publisher() + + // Publish 5 messages to the same partition + numMessages := 5 + publishedIDs := make([]string, numMessages) + for i := range numMessages { + msgID := fmt.Sprintf("order-%03d", i) + publishedIDs[i] = msgID + msg := queue.NewMessage(msgID, []byte(fmt.Sprintf("payload-%d", i)), "single-partition", nil) + require.NoError(t, publisher.Publish(s.ctx, topicName, msg)) + } + s.log.Logf("Published %d messages to single-partition", numMessages) + + // Track processing order under a lock; signal allDone when complete. + var mu sync.Mutex + processedIDs := make([]string, 0, numMessages) + allDone := make(chan struct{}) + + ctrl := &testController{ + name: "ordering-controller", + topicKey: topicKey, + consumerGroup: consumerGroup, + processFunc: func(ctx context.Context, delivery consumer.Delivery) error { + msgID := delivery.Message().ID + t.Logf("Processing: %s", msgID) + + // Record this message's ID; when all expected messages + // have been processed, signal completion via allDone. + mu.Lock() + processedIDs = append(processedIDs, msgID) + done := len(processedIDs) == numMessages + mu.Unlock() + + if done { + close(allDone) + } + return nil + }, + } + + c := s.newConsumer(t, q, topicKey, topicName, consumerGroup) + require.NoError(t, c.Register(ctrl)) + require.NoError(t, c.Start(s.ctx)) + + select { + case <-allDone: + case <-time.After(3 * testTimeout): + require.FailNow(t, "Timeout waiting for all messages to be processed") + } + + require.NoError(t, c.Stop(stopTimeoutMs)) + + // Assert order matches publish order + mu.Lock() + defer mu.Unlock() + require.Len(t, processedIDs, numMessages) + for i := range numMessages { + require.Equal(t, publishedIDs[i], processedIDs[i], + "Message at position %d out of order: expected %s, got %s", + i, publishedIDs[i], processedIDs[i]) + } + + s.log.Logf("Partition ordering verified: all %d messages processed in FIFO order", numMessages) +} + +func (s *ConsumerIntegrationSuite) TestConsumerMultiPartitionThroughput() { + t := s.T() + + topicKey := consumer.TopicKey("throughput-test") + topicName := "consumer-throughput-topic" + consumerGroup := "throughput-group" + + q := s.newQueue(t) + defer q.Close() + + publisher := q.Publisher() + + // Publish 1 message to each of 3 partitions + numPartitions := 3 + for i := range numPartitions { + partition := fmt.Sprintf("tp-partition-%d", i) + msg := queue.NewMessage(fmt.Sprintf("tp-msg-%d", i), []byte("data"), partition, nil) + require.NoError(t, publisher.Publish(s.ctx, topicName, msg)) + } + s.log.Logf("Published 1 message to each of %d partitions", numPartitions) + + // Barrier: each partition signals arrival, then waits for all partitions + // to start before completing. Proves parallel execution without timing. + var barrier sync.WaitGroup + barrier.Add(numPartitions) + + var done sync.WaitGroup + done.Add(numPartitions) + + ctrl := &testController{ + name: "throughput-controller", + topicKey: topicKey, + consumerGroup: consumerGroup, + processFunc: func(ctx context.Context, delivery consumer.Delivery) error { + t.Logf("Processing partition=%s", delivery.Message().PartitionKey) + barrier.Done() // signal: this partition started + barrier.Wait() // block until all partitions are processing concurrently + done.Done() + return nil + }, + } + + c := s.newConsumer(t, q, topicKey, topicName, consumerGroup) + require.NoError(t, c.Register(ctrl)) + require.NoError(t, c.Start(s.ctx)) + + // Wait for all partitions to complete + allDone := make(chan struct{}) + go func() { + done.Wait() + close(allDone) + }() + + select { + case <-allDone: + case <-time.After(3 * testTimeout): + require.FailNow(t, "Timeout waiting for all partitions to be processed — partitions may not be running in parallel") + } + + require.NoError(t, c.Stop(stopTimeoutMs)) + + s.log.Logf("Multi-partition throughput verified: %d partitions processed concurrently", numPartitions) +} diff --git a/test/integration/core/consumer/docker-compose.yml b/test/integration/core/consumer/docker-compose.yml new file mode 100644 index 00000000..9268a9c1 --- /dev/null +++ b/test/integration/core/consumer/docker-compose.yml @@ -0,0 +1,20 @@ +# Docker Compose for Consumer Integration Tests +# Tests the consumer framework with a real MySQL-backed queue + +services: + # MySQL database for queue infrastructure + mysql: + image: mysql:8.0 + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: submitqueue + ports: + - "3306" # Random ephemeral port to avoid conflicts + healthcheck: + # Use 127.0.0.1 (TCP) instead of localhost (Unix socket). MySQL treats + # "localhost" as a socket connection, which can be ready before the TCP + # listener — causing dependent services that connect over TCP to fail. + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"] + interval: 5s + timeout: 5s + retries: 10 diff --git a/test/integration/extension/queue/mysql/queue_test.go b/test/integration/extension/queue/mysql/queue_test.go index e45040a7..b54fba2d 100644 --- a/test/integration/extension/queue/mysql/queue_test.go +++ b/test/integration/extension/queue/mysql/queue_test.go @@ -37,6 +37,9 @@ import ( "github.com/uber/submitqueue/test/testutil" ) +// testTimeout is the safety-net duration for channel waits in integration tests. +const testTimeout = 10 * time.Second + type SQLQueueIntegrationSuite struct { suite.Suite ctx context.Context @@ -203,6 +206,113 @@ func (s *SQLQueueIntegrationSuite) TestPublishAndSubscribe() { t.Logf("Successfully received and acked 2 messages with metadata verified") } +func (s *SQLQueueIntegrationSuite) TestSubscriberPerPartitionIsolation() { + t := s.T() + + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, + Logger: zaptest.NewLogger(t), + MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q.Close() + + publisher := q.Publisher() + subscriber := q.Subscriber() + + topic := "subscriber_isolation_topic" + + // Subscribe with short poll interval for fast test + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "isolation-consumer") + subConfig.PollIntervalMs = 100 + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) + require.NoError(t, err) + + // Publish 1 message to partition-a and 1 to partition-b + msgA := queue.NewMessage("iso-msg-a", []byte("data-a"), "partition-a", nil) + msgB := queue.NewMessage("iso-msg-b", []byte("data-b"), "partition-b", nil) + require.NoError(t, publisher.Publish(s.ctx, topic, msgA)) + require.NoError(t, publisher.Publish(s.ctx, topic, msgB)) + t.Logf("Published 1 message to partition-a and 1 to partition-b") + + // Receive first delivery — hold it without acking (simulates slow processing) + first := receiveWithTimeout(t, deliveryChan, 5*time.Second) + t.Logf("First delivery received: partition=%s id=%s (holding without ack)", + first.Message().PartitionKey, first.Message().ID) + + // Receive second delivery — should arrive promptly even though first is unacked. + // If subscriber had head-of-line blocking, this would time out. + second := receiveWithTimeout(t, deliveryChan, 5*time.Second) + t.Logf("Second delivery received: partition=%s id=%s", + second.Message().PartitionKey, second.Message().ID) + + // Verify both partitions are represented + partitions := map[string]bool{ + first.Message().PartitionKey: true, + second.Message().PartitionKey: true, + } + assert.True(t, partitions["partition-a"], "should have delivery from partition-a") + assert.True(t, partitions["partition-b"], "should have delivery from partition-b") + + // Ack both + require.NoError(t, first.Ack(s.ctx)) + require.NoError(t, second.Ack(s.ctx)) + + t.Logf("Per-partition isolation verified: slow ack on one partition did not block the other") +} + +func (s *SQLQueueIntegrationSuite) TestSubscriberPartitionOrderPreserved() { + t := s.T() + + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, + Logger: zaptest.NewLogger(t), + MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q.Close() + + publisher := q.Publisher() + subscriber := q.Subscriber() + + topic := "subscriber_order_topic" + partitionKey := "ordered-part" + + // Publish 5 messages to the same partition + numMessages := 5 + publishedIDs := make([]string, numMessages) + for i := 0; i < numMessages; i++ { + msgID := fmt.Sprintf("order-msg-%03d", i) + publishedIDs[i] = msgID + msg := queue.NewMessage(msgID, []byte(fmt.Sprintf("payload-%d", i)), partitionKey, nil) + require.NoError(t, publisher.Publish(s.ctx, topic, msg)) + } + t.Logf("Published %d messages to partition %s", numMessages, partitionKey) + + // Subscribe and receive all + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "order-consumer") + subConfig.PollIntervalMs = 100 + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) + require.NoError(t, err) + + receivedIDs := make([]string, 0, numMessages) + receiveNWithTimeout(t, deliveryChan, numMessages, testTimeout, func(delivery extqueue.Delivery, index int) { + msgID := delivery.Message().ID + receivedIDs = append(receivedIDs, msgID) + t.Logf("Received: %s", msgID) + require.NoError(t, delivery.Ack(s.ctx)) + }) + + // Assert the received order matches publish order + for i := 0; i < numMessages; i++ { + assert.Equal(t, publishedIDs[i], receivedIDs[i], + "Message at position %d out of order: expected %s, got %s", + i, publishedIDs[i], receivedIDs[i]) + } + + t.Logf("Partition ordering verified: all %d messages received in publish order", numMessages) +} + func (s *SQLQueueIntegrationSuite) TestMultiplePartitions() { t := s.T()