From 7d17391f25bf01ab173be8f2c6edd666441778bc Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Sat, 21 Feb 2026 00:54:05 -0800 Subject: [PATCH] refactor(queue): move config to per-subscription SubscriptionConfig Different subscriptions may need different settings (timeouts, retry policies, batch sizes). Moving config from queue-level to Subscribe() enables per-topic customization without creating separate Queue instances. - Add SubscriptionConfig with SubscriberName, ConsumerGroup, polling, retry, and DLQ settings - Remove queue-level Config from SQL queue (now empty Params) - Update Subscriber.Subscribe() to accept SubscriptionConfig - Pass config parameters to stores instead of reading from struct fields - Update e2e tests to use new subscription-based config pattern - Fix: use fresh context with timeout for graceful lease release on shutdown to prevent context.Canceled errors leaving partitions orphaned --- e2e_test/queue/BUILD.bazel | 3 +- e2e_test/queue/queue_test.go | 173 ++++++++---------- extension/queue/BUILD.bazel | 13 +- extension/queue/sql/BUILD.bazel | 2 - extension/queue/sql/README.md | 58 ++++-- extension/queue/sql/config.go | 134 -------------- extension/queue/sql/config_test.go | 144 --------------- extension/queue/sql/message_store.go | 12 +- extension/queue/sql/message_store_test.go | 14 +- extension/queue/sql/mock_stores.go | 88 ++++----- extension/queue/sql/offset_store.go | 20 +- extension/queue/sql/offset_store_test.go | 26 +-- extension/queue/sql/partition_lease_store.go | 30 ++- .../queue/sql/partition_lease_store_test.go | 35 ++-- extension/queue/sql/publisher.go | 6 +- extension/queue/sql/publisher_test.go | 4 +- extension/queue/sql/sql.go | 22 +-- extension/queue/sql/sql_test.go | 27 --- extension/queue/sql/stores.go | 28 +-- extension/queue/sql/subscriber.go | 138 +++++++------- extension/queue/sql/subscriber_test.go | 32 ++-- extension/queue/subscriber.go | 7 +- extension/queue/subscription_config.go | 88 +++++++++ extension/queue/subscription_config_test.go | 141 ++++++++++++++ extension/storage/mysql/BUILD.bazel | 2 +- 25 files changed, 598 insertions(+), 649 deletions(-) delete mode 100644 extension/queue/sql/config.go delete mode 100644 extension/queue/sql/config_test.go create mode 100644 extension/queue/subscription_config.go create mode 100644 extension/queue/subscription_config_test.go diff --git a/e2e_test/queue/BUILD.bazel b/e2e_test/queue/BUILD.bazel index dd373428..7fe4a1e0 100644 --- a/e2e_test/queue/BUILD.bazel +++ b/e2e_test/queue/BUILD.bazel @@ -8,16 +8,15 @@ go_test( ], tags = ["integration"], deps = [ + "//e2e_test/testutil", "//entity/queue", "//extension/queue", "//extension/queue/sql", - "//e2e_test/testutil", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", "@com_github_testcontainers_testcontainers_go//:testcontainers-go", - "@com_github_testcontainers_testcontainers_go//network", "@com_github_testcontainers_testcontainers_go_modules_mysql//:mysql", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//zaptest", diff --git a/e2e_test/queue/queue_test.go b/e2e_test/queue/queue_test.go index 410b5b5f..10f121fa 100644 --- a/e2e_test/queue/queue_test.go +++ b/e2e_test/queue/queue_test.go @@ -97,12 +97,10 @@ func (s *QueueIntegrationSuite) TestPublishAndSubscribe() { t := s.T() // Create queue - config := queueSQL.DefaultConfig("test-consumer", "test-worker-1") q, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) defer q.Close() @@ -112,8 +110,9 @@ func (s *QueueIntegrationSuite) TestPublishAndSubscribe() { topic := "test_topic" - // Subscribe first - deliveryChan, err := subscriber.Subscribe(s.ctx, topic) + // Subscribe first with config + subConfig := extqueue.DefaultSubscriptionConfig("test-worker-1", "test-consumer") + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) // Publish messages with various metadata scenarios @@ -168,12 +167,10 @@ func (s *QueueIntegrationSuite) TestPublishAndSubscribe() { func (s *QueueIntegrationSuite) TestMultiplePartitions() { t := s.T() - config := queueSQL.DefaultConfig("multi-partition-consumer", "worker-1") q, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) defer q.Close() @@ -184,7 +181,8 @@ func (s *QueueIntegrationSuite) TestMultiplePartitions() { topic := "multi_partition_topic" // Subscribe - deliveryChan, err := subscriber.Subscribe(s.ctx, topic) + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "multi-partition-consumer") + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) // Publish messages to different partitions @@ -214,16 +212,10 @@ func (s *QueueIntegrationSuite) TestMultiplePartitions() { func (s *QueueIntegrationSuite) TestVisibilityTimeoutAndRetry() { t := s.T() - // Use short visibility timeout for faster test - config := queueSQL.DefaultConfig("retry-consumer", "worker-1") - config.VisibilityTimeout = 2 * time.Second - config.PollInterval = 100 * time.Millisecond - q, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) defer q.Close() @@ -233,8 +225,13 @@ func (s *QueueIntegrationSuite) TestVisibilityTimeoutAndRetry() { topic := "retry_topic" + // Use short visibility timeout for faster test + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "retry-consumer") + subConfig.VisibilityTimeoutMs = 2000 // 2 seconds + subConfig.PollIntervalMs = 100 // 100 milliseconds + // Subscribe - deliveryChan, err := subscriber.Subscribe(s.ctx, topic) + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) // Publish a message @@ -256,8 +253,8 @@ func (s *QueueIntegrationSuite) TestVisibilityTimeoutAndRetry() { require.NoError(t, err) // Wait for original visibility timeout to expire (but not the extended timeout) - t.Logf("Waiting for original visibility timeout (%v) - message should NOT reappear", config.VisibilityTimeout) - time.Sleep(config.VisibilityTimeout + 200*time.Millisecond) + t.Logf("Waiting for original visibility timeout (%v) - message should NOT reappear", time.Duration(subConfig.VisibilityTimeoutMs)*time.Millisecond) + time.Sleep(time.Duration(subConfig.VisibilityTimeoutMs)*time.Millisecond + 200*time.Millisecond) // Message should NOT be redelivered yet (visibility was extended) select { @@ -286,7 +283,7 @@ func (s *QueueIntegrationSuite) TestVisibilityTimeoutAndRetry() { // Wait for visibility timeout to expire t.Logf("Waiting for visibility timeout to expire...") - time.Sleep(config.VisibilityTimeout + 500*time.Millisecond) + time.Sleep(time.Duration(subConfig.VisibilityTimeoutMs)*time.Millisecond + 500*time.Millisecond) // Receive second time (retry) thirdDelivery := receiveWithTimeout(t, deliveryChan, 5*time.Second) @@ -302,14 +299,10 @@ func (s *QueueIntegrationSuite) TestVisibilityTimeoutAndRetry() { func (s *QueueIntegrationSuite) TestNackWithDelay() { t := s.T() - config := queueSQL.DefaultConfig("nack-consumer", "worker-1") - config.PollInterval = 100 * time.Millisecond - q, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) defer q.Close() @@ -320,7 +313,9 @@ func (s *QueueIntegrationSuite) TestNackWithDelay() { topic := "nack_topic" // Subscribe - deliveryChan, err := subscriber.Subscribe(s.ctx, topic) + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "nack-consumer") + subConfig.PollIntervalMs = 100 // 100 milliseconds + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) // Publish message @@ -356,12 +351,10 @@ func (s *QueueIntegrationSuite) TestNackWithDelay() { func (s *QueueIntegrationSuite) TestIdempotentPublish() { t := s.T() - config := queueSQL.DefaultConfig("idempotent-consumer", "worker-1") q, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) defer q.Close() @@ -372,7 +365,8 @@ func (s *QueueIntegrationSuite) TestIdempotentPublish() { topic := "idempotent_topic" // Subscribe - deliveryChan, err := subscriber.Subscribe(s.ctx, topic) + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "idempotent-consumer") + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) // Publish same message twice @@ -404,12 +398,10 @@ func (s *QueueIntegrationSuite) TestIdempotentPublish() { func (s *QueueIntegrationSuite) TestConcurrentPublishers() { t := s.T() - config := queueSQL.DefaultConfig("concurrent-consumer", "worker-1") q, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) defer q.Close() @@ -420,7 +412,8 @@ func (s *QueueIntegrationSuite) TestConcurrentPublishers() { topic := "concurrent_topic" // Subscribe - deliveryChan, err := subscriber.Subscribe(s.ctx, topic) + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "concurrent-consumer") + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) // Publish from multiple goroutines @@ -461,18 +454,10 @@ func (s *QueueIntegrationSuite) TestConcurrentPublishers() { func (s *QueueIntegrationSuite) TestCrashRecovery() { t := s.T() - // Use short timeouts for faster test - config := queueSQL.DefaultConfig("crash-consumer", "worker-1") - config.VisibilityTimeout = 2 * time.Second - config.PollInterval = 100 * time.Millisecond - config.LeaseDuration = 3 * time.Second // Short lease for testing crash recovery - config.LeaseRenewalInterval = 1 * time.Second // Must be less than LeaseDuration - q1, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) @@ -481,8 +466,15 @@ func (s *QueueIntegrationSuite) TestCrashRecovery() { topic := "crash_topic" + // Use short timeouts for faster test + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "crash-consumer") + subConfig.VisibilityTimeoutMs = 2000 // 2 seconds + subConfig.PollIntervalMs = 100 // 100 milliseconds + subConfig.LeaseDurationMs = 3000 // 3 seconds - short lease for testing crash recovery + subConfig.LeaseRenewalIntervalMs = 1000 // 1 second - must be less than LeaseDuration + // Subscribe with first worker - deliveryChan1, err := subscriber1.Subscribe(s.ctx, topic) + deliveryChan1, err := subscriber1.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) // Publish message @@ -499,28 +491,28 @@ func (s *QueueIntegrationSuite) TestCrashRecovery() { t.Logf("Worker 1 crashed (queue closed)") // Wait for both visibility timeout AND partition lease to expire - waitTime := config.LeaseDuration + config.VisibilityTimeout + time.Second + waitTime := time.Duration(subConfig.LeaseDurationMs+subConfig.VisibilityTimeoutMs)*time.Millisecond + time.Second t.Logf("Waiting %v for lease and visibility timeout to expire", waitTime) time.Sleep(waitTime) // Start worker 2 with same consumer group - config2 := queueSQL.DefaultConfig("crash-consumer", "worker-2") - config2.VisibilityTimeout = 2 * time.Second - config2.PollInterval = 100 * time.Millisecond - config2.LeaseDuration = 3 * time.Second - config2.LeaseRenewalInterval = 1 * time.Second - q2, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config2, }) require.NoError(t, err) defer q2.Close() subscriber2 := q2.Subscriber() - deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic) + + subConfig2 := extqueue.DefaultSubscriptionConfig("worker-2", "crash-consumer") + subConfig2.VisibilityTimeoutMs = 2000 // 2 seconds + subConfig2.PollIntervalMs = 100 // 100 milliseconds + subConfig2.LeaseDurationMs = 3000 // 3 seconds + subConfig2.LeaseRenewalIntervalMs = 1000 // 1 second + + deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic, subConfig2) require.NoError(t, err) // Worker 2 should receive the same message (recovery) @@ -540,22 +532,18 @@ func (s *QueueIntegrationSuite) TestMultipleConsumerGroups() { topic := "multi_group_topic" // Create two different consumer groups - config1 := queueSQL.DefaultConfig("group-A", "worker-1") q1, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config1, }) require.NoError(t, err) defer q1.Close() - config2 := queueSQL.DefaultConfig("group-B", "worker-1") q2, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config2, }) require.NoError(t, err) defer q2.Close() @@ -565,10 +553,12 @@ func (s *QueueIntegrationSuite) TestMultipleConsumerGroups() { subscriber2 := q2.Subscriber() // Subscribe both groups - deliveryChan1, err := subscriber1.Subscribe(s.ctx, topic) + subConfig1 := extqueue.DefaultSubscriptionConfig("worker-1", "group-A") + deliveryChan1, err := subscriber1.Subscribe(s.ctx, topic, subConfig1) require.NoError(t, err) - deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic) + subConfig2 := extqueue.DefaultSubscriptionConfig("worker-1", "group-B") + deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic, subConfig2) require.NoError(t, err) // Publish messages @@ -621,22 +611,18 @@ func (s *QueueIntegrationSuite) TestMultipleWorkersInConsumerGroup() { consumerGroup := "shared-group" // Create two workers in same consumer group - config1 := queueSQL.DefaultConfig(consumerGroup, "worker-1") q1, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config1, }) require.NoError(t, err) defer q1.Close() - config2 := queueSQL.DefaultConfig(consumerGroup, "worker-2") q2, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config2, }) require.NoError(t, err) defer q2.Close() @@ -646,10 +632,12 @@ func (s *QueueIntegrationSuite) TestMultipleWorkersInConsumerGroup() { subscriber2 := q2.Subscriber() // Subscribe both workers - deliveryChan1, err := subscriber1.Subscribe(s.ctx, topic) + subConfig1 := extqueue.DefaultSubscriptionConfig("worker-1", consumerGroup) + deliveryChan1, err := subscriber1.Subscribe(s.ctx, topic, subConfig1) require.NoError(t, err) - deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic) + subConfig2 := extqueue.DefaultSubscriptionConfig("worker-2", consumerGroup) + deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic, subConfig2) require.NoError(t, err) // Publish messages to different partitions so they can be distributed @@ -740,12 +728,10 @@ func (s *QueueIntegrationSuite) TestConcurrentSubscribers() { totalMessages := numSubscribers * messagesPerSubscriber // Create publisher - publisherConfig := queueSQL.DefaultConfig(consumerGroup, "publisher") pubQueue, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: publisherConfig, }) require.NoError(t, err) defer pubQueue.Close() @@ -757,18 +743,17 @@ func (s *QueueIntegrationSuite) TestConcurrentSubscribers() { var deliveryChans []<-chan extqueue.Delivery for i := 0; i < numSubscribers; i++ { - config := queueSQL.DefaultConfig(consumerGroup, fmt.Sprintf("worker-%d", i)) q, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) queues = append(queues, q) subscriber := q.Subscriber() - deliveryChan, err := subscriber.Subscribe(s.ctx, topic) + subConfig := extqueue.DefaultSubscriptionConfig(fmt.Sprintf("worker-%d", i), consumerGroup) + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) deliveryChans = append(deliveryChans, deliveryChan) } @@ -848,18 +833,10 @@ func (s *QueueIntegrationSuite) TestDeadLetterQueue() { topic := "dlq_topic" - // Configure with low max attempts and DLQ enabled - config := queueSQL.DefaultConfig("dlq-consumer", "worker-1") - config.PollInterval = 100 * time.Millisecond - config.VisibilityTimeout = 1 * time.Second - config.Retry.MaxAttempts = 2 // Only 2 attempts before DLQ - config.DLQ.Enabled = true - q, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) defer q.Close() @@ -867,8 +844,15 @@ func (s *QueueIntegrationSuite) TestDeadLetterQueue() { publisher := q.Publisher() subscriber := q.Subscriber() + // Configure with low max attempts and DLQ enabled + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "dlq-consumer") + subConfig.PollIntervalMs = 100 // 100 milliseconds + subConfig.VisibilityTimeoutMs = 1000 // 1 second + subConfig.Retry.MaxAttempts = 2 // Only 2 attempts before DLQ + subConfig.DLQ.Enabled = true + // Subscribe to main topic - deliveryChan, err := subscriber.Subscribe(s.ctx, topic) + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) // Publish a message that will fail @@ -878,7 +862,7 @@ func (s *QueueIntegrationSuite) TestDeadLetterQueue() { t.Logf("Published poison message, will nack repeatedly") // Receive and nack the message MaxAttempts times - for attempt := 1; attempt <= config.Retry.MaxAttempts; attempt++ { + for attempt := 1; attempt <= subConfig.Retry.MaxAttempts; attempt++ { delivery := receiveWithTimeout(t, deliveryChan, 10*time.Second) t.Logf("Attempt %d: received message, nacking", delivery.Attempt()) assert.Equal(t, attempt, delivery.Attempt()) @@ -888,11 +872,11 @@ func (s *QueueIntegrationSuite) TestDeadLetterQueue() { require.NoError(t, delivery.Nack(s.ctx, 0)) // Wait a bit for visibility timeout - time.Sleep(config.VisibilityTimeout + 200*time.Millisecond) + time.Sleep(time.Duration(subConfig.VisibilityTimeoutMs)*time.Millisecond + 200*time.Millisecond) } // After MaxAttempts, message should be moved to DLQ topic - t.Logf("Message should be moved to DLQ after %d failed attempts", config.Retry.MaxAttempts) + t.Logf("Message should be moved to DLQ after %d failed attempts", subConfig.Retry.MaxAttempts) // Should NOT receive on main topic anymore (message moved to DLQ) select { @@ -903,10 +887,11 @@ func (s *QueueIntegrationSuite) TestDeadLetterQueue() { } // Subscribe to DLQ topic to consume the failed message - dlqTopic := topic + config.DLQ.TopicSuffix + dlqTopic := topic + subConfig.DLQ.TopicSuffix t.Logf("Subscribing to DLQ topic: %s", dlqTopic) - dlqDeliveryChan, err := subscriber.Subscribe(s.ctx, dlqTopic) + dlqConfig := extqueue.DefaultSubscriptionConfig("worker-1", "dlq-consumer") + dlqDeliveryChan, err := subscriber.Subscribe(s.ctx, dlqTopic, dlqConfig) require.NoError(t, err) // Receive the message from DLQ @@ -924,7 +909,7 @@ func (s *QueueIntegrationSuite) TestDeadLetterQueue() { // Verify values assert.Equal(t, topic, metadata["dlq.original_topic"]) - assert.Equal(t, fmt.Sprintf("%d", config.Retry.MaxAttempts), metadata["dlq.failure_count"]) + assert.Equal(t, fmt.Sprintf("%d", subConfig.Retry.MaxAttempts), metadata["dlq.failure_count"]) assert.Equal(t, "exceeded retry limit", metadata["dlq.last_error"]) failedAt := metadata["dlq.failed_at"] @@ -944,12 +929,10 @@ func (s *QueueIntegrationSuite) TestMessageOrderingWithinPartition() { topic := "ordering_topic" partitionKey := "ordered-partition" - config := queueSQL.DefaultConfig("ordering-consumer", "worker-1") q, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) defer q.Close() @@ -958,7 +941,8 @@ func (s *QueueIntegrationSuite) TestMessageOrderingWithinPartition() { subscriber := q.Subscriber() // Subscribe first - deliveryChan, err := subscriber.Subscribe(s.ctx, topic) + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "ordering-consumer") + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) // Publish messages with same partition key (should be ordered) @@ -996,12 +980,10 @@ func (s *QueueIntegrationSuite) TestLateSubscriber() { topic := "late_subscriber_topic" - config := queueSQL.DefaultConfig("late-consumer", "worker-1") q, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) defer q.Close() @@ -1021,7 +1003,8 @@ func (s *QueueIntegrationSuite) TestLateSubscriber() { // Now subscribe (late subscriber) subscriber := q.Subscriber() - deliveryChan, err := subscriber.Subscribe(s.ctx, topic) + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "late-consumer") + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) t.Logf("Late subscriber joined after messages published") @@ -1048,13 +1031,10 @@ func (s *QueueIntegrationSuite) TestEmptyTopicSubscribe() { topic := "empty_topic" - config := queueSQL.DefaultConfig("empty-consumer", "worker-1") - config.PollInterval = 100 * time.Millisecond q, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) defer q.Close() @@ -1062,7 +1042,9 @@ func (s *QueueIntegrationSuite) TestEmptyTopicSubscribe() { subscriber := q.Subscriber() // Subscribe to empty topic (no messages published yet) - deliveryChan, err := subscriber.Subscribe(s.ctx, topic) + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "empty-consumer") + subConfig.PollIntervalMs = 100 // 100 milliseconds + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) t.Logf("Subscribed to empty topic") @@ -1093,13 +1075,10 @@ func (s *QueueIntegrationSuite) TestGracefulShutdownDuringProcessing() { topic := "shutdown_topic" - config := queueSQL.DefaultConfig("shutdown-consumer", "worker-1") - config.PollInterval = 100 * time.Millisecond q, err := queueSQL.NewQueue(queueSQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) @@ -1107,7 +1086,9 @@ func (s *QueueIntegrationSuite) TestGracefulShutdownDuringProcessing() { subscriber := q.Subscriber() // Subscribe - deliveryChan, err := subscriber.Subscribe(s.ctx, topic) + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "shutdown-consumer") + subConfig.PollIntervalMs = 100 // 100 milliseconds + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) // Publish messages @@ -1155,8 +1136,8 @@ drainLoop: // Wait for visibility timeout to expire so messages become visible again // All messages (in-flight + buffered) were fetched and marked invisible - t.Logf("Waiting for visibility timeout to expire (%v) so messages become visible again...", config.VisibilityTimeout) - time.Sleep(config.VisibilityTimeout + 500*time.Millisecond) + t.Logf("Waiting for visibility timeout to expire (%v) so messages become visible again...", time.Duration(subConfig.VisibilityTimeoutMs)*time.Millisecond) + time.Sleep(time.Duration(subConfig.VisibilityTimeoutMs)*time.Millisecond + 500*time.Millisecond) // Start new subscriber to verify all messages are redelivered t.Logf("Starting new subscriber to verify message recovery...") @@ -1164,13 +1145,13 @@ drainLoop: DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, - Config: config, }) require.NoError(t, err) defer q2.Close() subscriber2 := q2.Subscriber() - deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic) + subConfig2 := extqueue.DefaultSubscriptionConfig("worker-1", "shutdown-consumer") + deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic, subConfig2) require.NoError(t, err) // Receive all unprocessed messages (all should be redelivered after visibility timeout) diff --git a/extension/queue/BUILD.bazel b/extension/queue/BUILD.bazel index 0251e3b4..993dcbc9 100644 --- a/extension/queue/BUILD.bazel +++ b/extension/queue/BUILD.bazel @@ -1,4 +1,4 @@ -load("@rules_go//go:def.bzl", "go_library") +load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "queue", @@ -7,8 +7,19 @@ go_library( "publisher.go", "queue.go", "subscriber.go", + "subscription_config.go", ], importpath = "github.com/uber/submitqueue/extension/queue", visibility = ["//visibility:public"], deps = ["//entity/queue"], ) + +go_test( + name = "queue_test", + srcs = ["subscription_config_test.go"], + embed = [":queue"], + deps = [ + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/extension/queue/sql/BUILD.bazel b/extension/queue/sql/BUILD.bazel index 5b552d68..54cd048b 100644 --- a/extension/queue/sql/BUILD.bazel +++ b/extension/queue/sql/BUILD.bazel @@ -3,7 +3,6 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "sql", srcs = [ - "config.go", "constants.go", "errors.go", "message_store.go", @@ -30,7 +29,6 @@ go_library( go_test( name = "sql_test", srcs = [ - "config_test.go", "message_store_test.go", "offset_store_test.go", "partition_lease_store_test.go", diff --git a/extension/queue/sql/README.md b/extension/queue/sql/README.md index 8c9a9175..06eeeecc 100644 --- a/extension/queue/sql/README.md +++ b/extension/queue/sql/README.md @@ -16,25 +16,26 @@ import ( "database/sql" _ "github.com/go-sql-driver/mysql" queueSQL "github.com/uber/submitqueue/extension/queue/sql" + extqueue "github.com/uber/submitqueue/extension/queue" "github.com/uber/submitqueue/entity/queue" ) // Setup db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/db") q, _ := queueSQL.NewQueue(queueSQL.Params{ - DB: db, - Logger: logger, - Config: queueSQL.DefaultConfig("orchestrator", "worker-1"), + DB: db, + Logger: logger, + MetricsScope: metrics, }) defer q.Close() // Publish -msg := queue.NewMessage("msg-id", []byte(`{"data": "value"}`)) -msg.PartitionKey = "repo-123" // Required for ordering +msg := queue.NewMessage("msg-id", []byte(`{"data": "value"}`), "repo-123", nil) q.Publisher().Publish(ctx, "merge_events", msg) -// Subscribe -deliveryCh, _ := q.Subscriber().Subscribe(ctx, "merge_events") +// Subscribe with per-subscription config +subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "orchestrator") +deliveryCh, _ := q.Subscriber().Subscribe(ctx, "merge_events", subConfig) for delivery := range deliveryCh { if err := process(delivery.Message()); err != nil { delivery.Nack(ctx, 0) // Retry @@ -46,12 +47,45 @@ for delivery := range deliveryCh { ## Configuration +Per-subscription configuration enables different settings for each topic: + ```go -config := queueSQL.DefaultConfig("consumer-group", "worker-id") -config.PollInterval = 50 * time.Millisecond // Poll frequency -config.BatchSize = 20 // Messages per poll -config.VisibilityTimeout = 60 * time.Second // Retry delay -config.Retry.MaxAttempts = 3 // Max retries before DLQ +import extqueue "github.com/uber/submitqueue/extension/queue" + +// Default config +subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "consumer-group") + +// Customize for this subscription +subConfig.PollIntervalMs = 50 // Poll frequency (milliseconds) +subConfig.BatchSize = 20 // Messages per poll +subConfig.VisibilityTimeoutMs = 60000 // Retry delay (milliseconds) +subConfig.LeaseRenewalIntervalMs = 10000 // Lease renewal frequency (milliseconds) +subConfig.LeaseDurationMs = 30000 // Lease timeout (milliseconds) +subConfig.Retry.MaxAttempts = 3 // Max retries before DLQ +subConfig.Retry.InitialBackoffMs = 1000 // Initial retry backoff (milliseconds) +subConfig.Retry.MaxBackoffMs = 30000 // Max retry backoff (milliseconds) +subConfig.Retry.BackoffMultiplier = 2.0 // Backoff multiplier for exponential backoff +subConfig.DLQ.Enabled = true // Enable dead letter queue +subConfig.DLQ.TopicSuffix = "_dlq" // DLQ topic suffix + +// Use config when subscribing +deliveryCh, _ := q.Subscriber().Subscribe(ctx, "my-topic", subConfig) +``` + +**Key Configuration Fields:** + +- `SubscriberName`: Unique worker identifier for partition leasing (e.g., hostname, pod name) +- `ConsumerGroup`: Consumer group for independent offset tracking +- `PollIntervalMs`: How often to poll for new messages (milliseconds) +- `BatchSize`: Maximum messages to fetch per poll +- `VisibilityTimeoutMs`: How long messages are invisible after being fetched (milliseconds) +- `LeaseRenewalIntervalMs`: How often to renew partition leases (milliseconds) +- `LeaseDurationMs`: How long leases remain valid without renewal (milliseconds) +- `Retry.MaxAttempts`: Maximum processing attempts before moving to DLQ +- `Retry.InitialBackoffMs`: Initial retry backoff delay (milliseconds) +- `Retry.MaxBackoffMs`: Maximum retry backoff delay (milliseconds) +- `Retry.BackoffMultiplier`: Multiplier for exponential backoff +- `DLQ.TopicSuffix`: Suffix appended to topic name for DLQ (e.g., "orders" → "orders_dlq") ``` ## How It Works diff --git a/extension/queue/sql/config.go b/extension/queue/sql/config.go deleted file mode 100644 index aadca89b..00000000 --- a/extension/queue/sql/config.go +++ /dev/null @@ -1,134 +0,0 @@ -package sql - -import ( - "fmt" - "time" -) - -// Config holds configuration for SQL-based queue. -// DB connection, logger, and metrics are passed separately to NewQueue. -type Config struct { - // ConsumerGroup identifies this consumer for offset tracking (required) - ConsumerGroup string - - // WorkerID uniquely identifies this worker instance (required for partition leases) - // Example: hostname, pod name, UUID, etc. - WorkerID string - - // PollInterval is how often to poll for new messages - PollInterval time.Duration - - // BatchSize is the number of messages to fetch per poll - BatchSize int - - // VisibilityTimeout is how long a message is invisible after being fetched - // If worker crashes or gets stuck, message becomes visible again after this duration - VisibilityTimeout time.Duration - - // LeaseRenewalInterval is how often to renew partition leases - LeaseRenewalInterval time.Duration - - // LeaseDuration is how long a lease is valid without renewal - // Stale leases (not renewed within this duration) can be stolen by other workers - LeaseDuration time.Duration - - // Retry configuration for message retry - Retry RetryConfig - - // DLQ configuration - DLQ DLQConfig -} - -// RetryConfig configures message retry behavior -type RetryConfig struct { - // MaxAttempts is the maximum number of processing attempts - // After this many retries, message is moved to DLQ (if enabled) - // This includes both visibility timeout retries and explicit Nack retries - MaxAttempts int - - // InitialBackoff is the initial backoff duration for explicit Nack retries - InitialBackoff time.Duration - - // MaxBackoff is the maximum backoff duration - MaxBackoff time.Duration - - // BackoffMultiplier is the multiplier for exponential backoff - BackoffMultiplier float64 -} - -// DLQConfig configures dead letter queue -type DLQConfig struct { - // Enabled enables dead letter queue - Enabled bool - - // TopicSuffix is appended to the original topic name to create the DLQ topic - // For example, if original topic is "orders" and suffix is "_dlq", DLQ topic will be "orders_dlq" - TopicSuffix string -} - -// DefaultConfig returns a Config with sensible defaults -func DefaultConfig(consumerGroup, workerID string) Config { - return Config{ - ConsumerGroup: consumerGroup, - WorkerID: workerID, - PollInterval: 100 * time.Millisecond, - BatchSize: 10, - VisibilityTimeout: 60 * time.Second, - LeaseRenewalInterval: 10 * time.Second, - LeaseDuration: 30 * time.Second, - Retry: RetryConfig{ - MaxAttempts: 3, - InitialBackoff: 1 * time.Second, - MaxBackoff: 30 * time.Second, - BackoffMultiplier: 2.0, - }, - DLQ: DLQConfig{ - Enabled: true, - TopicSuffix: "_dlq", - }, - } -} - -// Validate checks if the configuration is valid -func (c *Config) Validate() error { - if c.ConsumerGroup == "" { - return fmt.Errorf("ConsumerGroup is required") - } - if c.WorkerID == "" { - return fmt.Errorf("WorkerID is required") - } - if c.PollInterval <= 0 { - return fmt.Errorf("PollInterval must be positive") - } - if c.BatchSize <= 0 { - return fmt.Errorf("BatchSize must be positive") - } - if c.VisibilityTimeout <= 0 { - return fmt.Errorf("VisibilityTimeout must be positive") - } - if c.LeaseRenewalInterval <= 0 { - return fmt.Errorf("LeaseRenewalInterval must be positive") - } - if c.LeaseDuration <= 0 { - return fmt.Errorf("LeaseDuration must be positive") - } - if c.LeaseRenewalInterval >= c.LeaseDuration { - return fmt.Errorf("LeaseRenewalInterval must be less than LeaseDuration") - } - if c.Retry.MaxAttempts < 1 { - return fmt.Errorf("Retry.MaxAttempts must be at least 1") - } - if c.Retry.InitialBackoff <= 0 { - return fmt.Errorf("Retry.InitialBackoff must be positive") - } - if c.Retry.MaxBackoff <= 0 { - return fmt.Errorf("Retry.MaxBackoff must be positive") - } - if c.Retry.BackoffMultiplier < 1.0 { - return fmt.Errorf("Retry.BackoffMultiplier must be >= 1.0") - } - if c.DLQ.Enabled && c.DLQ.TopicSuffix == "" { - return fmt.Errorf("DLQ.TopicSuffix is required when DLQ is enabled") - } - return nil -} diff --git a/extension/queue/sql/config_test.go b/extension/queue/sql/config_test.go deleted file mode 100644 index 6156e4a3..00000000 --- a/extension/queue/sql/config_test.go +++ /dev/null @@ -1,144 +0,0 @@ -package sql - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestDefaultConfig(t *testing.T) { - cfg := DefaultConfig("test-consumer", "test-worker") - - assert.Equal(t, "test-consumer", cfg.ConsumerGroup) - assert.Equal(t, "test-worker", cfg.WorkerID) - assert.Equal(t, 100*time.Millisecond, cfg.PollInterval) - assert.Equal(t, 10, cfg.BatchSize) - assert.Equal(t, 60*time.Second, cfg.VisibilityTimeout) - assert.Equal(t, 10*time.Second, cfg.LeaseRenewalInterval) - assert.Equal(t, 30*time.Second, cfg.LeaseDuration) - assert.True(t, cfg.DLQ.Enabled) - assert.Equal(t, "_dlq", cfg.DLQ.TopicSuffix) - assert.Equal(t, 3, cfg.Retry.MaxAttempts) - assert.Equal(t, 1*time.Second, cfg.Retry.InitialBackoff) - assert.Equal(t, 30*time.Second, cfg.Retry.MaxBackoff) - assert.Equal(t, 2.0, cfg.Retry.BackoffMultiplier) -} - -func TestConfigValidation(t *testing.T) { - tests := []struct { - name string - config Config - expectError bool - }{ - { - name: "valid config", - config: DefaultConfig("test-consumer", "test-worker"), - expectError: false, - }, - { - name: "empty consumer group", - config: Config{ - ConsumerGroup: "", - WorkerID: "test-worker", - PollInterval: 100 * time.Millisecond, - BatchSize: 10, - VisibilityTimeout: 60 * time.Second, - LeaseRenewalInterval: 10 * time.Second, - LeaseDuration: 30 * time.Second, - Retry: DefaultConfig("dummy", "dummy").Retry, - }, - expectError: true, - }, - { - name: "empty worker ID", - config: Config{ - ConsumerGroup: "test", - WorkerID: "", - PollInterval: 100 * time.Millisecond, - BatchSize: 10, - VisibilityTimeout: 60 * time.Second, - LeaseRenewalInterval: 10 * time.Second, - LeaseDuration: 30 * time.Second, - Retry: DefaultConfig("dummy", "dummy").Retry, - }, - expectError: true, - }, - { - name: "invalid poll interval", - config: Config{ - ConsumerGroup: "test", - WorkerID: "test-worker", - PollInterval: 0, - BatchSize: 10, - VisibilityTimeout: 60 * time.Second, - LeaseRenewalInterval: 10 * time.Second, - LeaseDuration: 30 * time.Second, - Retry: DefaultConfig("dummy", "dummy").Retry, - }, - expectError: true, - }, - { - name: "invalid batch size", - config: Config{ - ConsumerGroup: "test", - WorkerID: "test-worker", - PollInterval: 100 * time.Millisecond, - BatchSize: 0, - VisibilityTimeout: 60 * time.Second, - LeaseRenewalInterval: 10 * time.Second, - LeaseDuration: 30 * time.Second, - Retry: DefaultConfig("dummy", "dummy").Retry, - }, - expectError: true, - }, - { - name: "DLQ enabled without topic suffix", - config: Config{ - ConsumerGroup: "test", - WorkerID: "test-worker", - PollInterval: 100 * time.Millisecond, - BatchSize: 10, - VisibilityTimeout: 60 * time.Second, - LeaseRenewalInterval: 10 * time.Second, - LeaseDuration: 30 * time.Second, - Retry: DefaultConfig("dummy", "dummy").Retry, - DLQ: DLQConfig{ - Enabled: true, - TopicSuffix: "", // Missing suffix - }, - }, - expectError: true, - }, - { - name: "DLQ disabled without topic suffix - valid", - config: Config{ - ConsumerGroup: "test", - WorkerID: "test-worker", - PollInterval: 100 * time.Millisecond, - BatchSize: 10, - VisibilityTimeout: 60 * time.Second, - LeaseRenewalInterval: 10 * time.Second, - LeaseDuration: 30 * time.Second, - Retry: DefaultConfig("dummy", "dummy").Retry, - DLQ: DLQConfig{ - Enabled: false, - TopicSuffix: "", // Suffix not required when disabled - }, - }, - expectError: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := tt.config.Validate() - if tt.expectError { - require.Error(t, err) - } else { - require.NoError(t, err) - } - }) - } -} diff --git a/extension/queue/sql/message_store.go b/extension/queue/sql/message_store.go index bbccbbaf..ba501ec7 100644 --- a/extension/queue/sql/message_store.go +++ b/extension/queue/sql/message_store.go @@ -17,7 +17,6 @@ import ( // sqlmessageStore is the SQL implementation of messageStore type sqlmessageStore struct { db *sql.DB - config Config logger *zap.SugaredLogger metrics tally.Scope } @@ -30,10 +29,9 @@ const ( ) // newMessageStore creates a new SQL message store -func newMessageStore(db *sql.DB, config Config, logger *zap.Logger, metrics tally.Scope) messageStore { +func newMessageStore(db *sql.DB, logger *zap.Logger, metrics tally.Scope) messageStore { return &sqlmessageStore{ db: db, - config: config, logger: logger.Sugar().Named("message_store"), metrics: metrics.SubScope("message_store"), } @@ -181,7 +179,7 @@ func (s *sqlmessageStore) Delete(ctx context.Context, topic string, messageID st // FetchByOffset fetches visible messages with offset > currentOffset for a specific partition // Atomically sets invisible_until and increments retry_count for fetched messages -func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, limit int) ([]messageRow, error) { +func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, limit int, visibilityTimeoutMs int64) ([]messageRow, error) { start := time.Now() success := false defer func() { @@ -193,7 +191,7 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti }() now := start.UnixMilli() - invisibleUntil := now + s.config.VisibilityTimeout.Milliseconds() + invisibleUntil := now + visibilityTimeoutMs // Start transaction to atomically fetch and update messages tx, err := s.db.BeginTx(ctx, nil) @@ -360,7 +358,7 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti // MoveToDLQ atomically moves a message to the DLQ by reinserting it with the DLQ topic name // The message is inserted back into queue_messages table with the DLQ topic (original + suffix) // This allows DLQ messages to be consumed using the normal subscriber -func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID string, failureCount int, lastError string) error { +func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID string, failureCount int, lastError string, dlqTopicSuffix string) error { start := time.Now() success := false defer func() { @@ -372,7 +370,7 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID }() // Construct DLQ topic name - dlqTopic := topic + s.config.DLQ.TopicSuffix + dlqTopic := topic + dlqTopicSuffix tx, err := s.db.BeginTx(ctx, nil) if err != nil { diff --git a/extension/queue/sql/message_store_test.go b/extension/queue/sql/message_store_test.go index 8d0ad38a..561857a6 100644 --- a/extension/queue/sql/message_store_test.go +++ b/extension/queue/sql/message_store_test.go @@ -25,8 +25,7 @@ func setupmessageStoreTest(t *testing.T) (*sql.DB, sqlmock.Sqlmock, messageStore db, mock, err := sqlmock.New() require.NoError(t, err) - config := DefaultConfig("test-consumer", "test-worker") - store := newMessageStore(db, config, zaptest.NewLogger(t), testMetrics()) + store := newMessageStore(db, zaptest.NewLogger(t), testMetrics()) return db, mock, store } @@ -109,6 +108,7 @@ func TestmessageStore_FetchByOffset(t *testing.T) { partitionKey := "part1" currentOffset := int64(0) limit := 10 + visibilityTimeoutMs := int64(60000) // 60 seconds in milliseconds // Expect transaction begin mock.ExpectBegin() @@ -128,7 +128,7 @@ func TestmessageStore_FetchByOffset(t *testing.T) { // Expect commit mock.ExpectCommit() - results, err := store.FetchByOffset(ctx, topic, partitionKey, currentOffset, limit) + results, err := store.FetchByOffset(ctx, topic, partitionKey, currentOffset, limit, visibilityTimeoutMs) require.NoError(t, err) require.Len(t, results, 1) require.Equal(t, "msg1", results[0].ID) @@ -162,10 +162,8 @@ func TestmessageStore_MoveToDLQ(t *testing.T) { messageID := "msg1" failureCount := 3 lastError := "test error" - - // Get config to know the DLQ suffix - config := DefaultConfig("test-consumer", "test-worker") - dlqTopic := topic + config.DLQ.TopicSuffix // "test_topic_dlq" + dlqTopicSuffix := "_dlq" + dlqTopic := topic + dlqTopicSuffix // Expect transaction begin mock.ExpectBegin() @@ -193,7 +191,7 @@ func TestmessageStore_MoveToDLQ(t *testing.T) { // Expect commit mock.ExpectCommit() - err := store.MoveToDLQ(ctx, topic, messageID, failureCount, lastError) + err := store.MoveToDLQ(ctx, topic, messageID, failureCount, lastError, dlqTopicSuffix) require.NoError(t, err) require.NoError(t, mock.ExpectationsWereMet()) } diff --git a/extension/queue/sql/mock_stores.go b/extension/queue/sql/mock_stores.go index 6d00b04c..4c57d1e8 100644 --- a/extension/queue/sql/mock_stores.go +++ b/extension/queue/sql/mock_stores.go @@ -56,18 +56,18 @@ func (mr *MockmessageStoreMockRecorder) Delete(ctx, topic, messageID any) *gomoc } // FetchByOffset mocks base method. -func (m *MockmessageStore) FetchByOffset(ctx context.Context, topic, partitionKey string, currentOffset int64, limit int) ([]messageRow, error) { +func (m *MockmessageStore) FetchByOffset(ctx context.Context, topic, partitionKey string, currentOffset int64, limit int, visibilityTimeoutMs int64) ([]messageRow, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchByOffset", ctx, topic, partitionKey, currentOffset, limit) + ret := m.ctrl.Call(m, "FetchByOffset", ctx, topic, partitionKey, currentOffset, limit, visibilityTimeoutMs) ret0, _ := ret[0].([]messageRow) ret1, _ := ret[1].(error) return ret0, ret1 } // FetchByOffset indicates an expected call of FetchByOffset. -func (mr *MockmessageStoreMockRecorder) FetchByOffset(ctx, topic, partitionKey, currentOffset, limit any) *gomock.Call { +func (mr *MockmessageStoreMockRecorder) FetchByOffset(ctx, topic, partitionKey, currentOffset, limit, visibilityTimeoutMs any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchByOffset", reflect.TypeOf((*MockmessageStore)(nil).FetchByOffset), ctx, topic, partitionKey, currentOffset, limit) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchByOffset", reflect.TypeOf((*MockmessageStore)(nil).FetchByOffset), ctx, topic, partitionKey, currentOffset, limit, visibilityTimeoutMs) } // Insert mocks base method. @@ -85,17 +85,17 @@ func (mr *MockmessageStoreMockRecorder) Insert(ctx, topic, messages any) *gomock } // MoveToDLQ mocks base method. -func (m *MockmessageStore) MoveToDLQ(ctx context.Context, topic, messageID string, failureCount int, lastError string) error { +func (m *MockmessageStore) MoveToDLQ(ctx context.Context, topic, messageID string, failureCount int, lastError, dlqTopicSuffix string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MoveToDLQ", ctx, topic, messageID, failureCount, lastError) + ret := m.ctrl.Call(m, "MoveToDLQ", ctx, topic, messageID, failureCount, lastError, dlqTopicSuffix) ret0, _ := ret[0].(error) return ret0 } // MoveToDLQ indicates an expected call of MoveToDLQ. -func (mr *MockmessageStoreMockRecorder) MoveToDLQ(ctx, topic, messageID, failureCount, lastError any) *gomock.Call { +func (mr *MockmessageStoreMockRecorder) MoveToDLQ(ctx, topic, messageID, failureCount, lastError, dlqTopicSuffix any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveToDLQ", reflect.TypeOf((*MockmessageStore)(nil).MoveToDLQ), ctx, topic, messageID, failureCount, lastError) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveToDLQ", reflect.TypeOf((*MockmessageStore)(nil).MoveToDLQ), ctx, topic, messageID, failureCount, lastError, dlqTopicSuffix) } // SetVisibilityTimeout mocks base method. @@ -137,60 +137,60 @@ func (m *MockoffsetStore) EXPECT() *MockoffsetStoreMockRecorder { } // AckMessage mocks base method. -func (m *MockoffsetStore) AckMessage(ctx context.Context, topic, partitionKey, messageID string, offset int64, messageStore messageStore) error { +func (m *MockoffsetStore) AckMessage(ctx context.Context, topic, partitionKey, messageID string, offset int64, consumerGroup string, messageStore messageStore) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AckMessage", ctx, topic, partitionKey, messageID, offset, messageStore) + ret := m.ctrl.Call(m, "AckMessage", ctx, topic, partitionKey, messageID, offset, consumerGroup, messageStore) ret0, _ := ret[0].(error) return ret0 } // AckMessage indicates an expected call of AckMessage. -func (mr *MockoffsetStoreMockRecorder) AckMessage(ctx, topic, partitionKey, messageID, offset, messageStore any) *gomock.Call { +func (mr *MockoffsetStoreMockRecorder) AckMessage(ctx, topic, partitionKey, messageID, offset, consumerGroup, messageStore any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AckMessage", reflect.TypeOf((*MockoffsetStore)(nil).AckMessage), ctx, topic, partitionKey, messageID, offset, messageStore) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AckMessage", reflect.TypeOf((*MockoffsetStore)(nil).AckMessage), ctx, topic, partitionKey, messageID, offset, consumerGroup, messageStore) } // GetAckedOffset mocks base method. -func (m *MockoffsetStore) GetAckedOffset(ctx context.Context, topic, partitionKey string) (int64, error) { +func (m *MockoffsetStore) GetAckedOffset(ctx context.Context, topic, partitionKey, consumerGroup string) (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetAckedOffset", ctx, topic, partitionKey) + ret := m.ctrl.Call(m, "GetAckedOffset", ctx, topic, partitionKey, consumerGroup) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } // GetAckedOffset indicates an expected call of GetAckedOffset. -func (mr *MockoffsetStoreMockRecorder) GetAckedOffset(ctx, topic, partitionKey any) *gomock.Call { +func (mr *MockoffsetStoreMockRecorder) GetAckedOffset(ctx, topic, partitionKey, consumerGroup any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAckedOffset", reflect.TypeOf((*MockoffsetStore)(nil).GetAckedOffset), ctx, topic, partitionKey) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAckedOffset", reflect.TypeOf((*MockoffsetStore)(nil).GetAckedOffset), ctx, topic, partitionKey, consumerGroup) } // Initialize mocks base method. -func (m *MockoffsetStore) Initialize(ctx context.Context, topic, partitionKey string) error { +func (m *MockoffsetStore) Initialize(ctx context.Context, topic, partitionKey, consumerGroup string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Initialize", ctx, topic, partitionKey) + ret := m.ctrl.Call(m, "Initialize", ctx, topic, partitionKey, consumerGroup) ret0, _ := ret[0].(error) return ret0 } // Initialize indicates an expected call of Initialize. -func (mr *MockoffsetStoreMockRecorder) Initialize(ctx, topic, partitionKey any) *gomock.Call { +func (mr *MockoffsetStoreMockRecorder) Initialize(ctx, topic, partitionKey, consumerGroup any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Initialize", reflect.TypeOf((*MockoffsetStore)(nil).Initialize), ctx, topic, partitionKey) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Initialize", reflect.TypeOf((*MockoffsetStore)(nil).Initialize), ctx, topic, partitionKey, consumerGroup) } // UpdateAckedOffset mocks base method. -func (m *MockoffsetStore) UpdateAckedOffset(ctx context.Context, topic, partitionKey string, offset int64) error { +func (m *MockoffsetStore) UpdateAckedOffset(ctx context.Context, topic, partitionKey string, offset int64, consumerGroup string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateAckedOffset", ctx, topic, partitionKey, offset) + ret := m.ctrl.Call(m, "UpdateAckedOffset", ctx, topic, partitionKey, offset, consumerGroup) ret0, _ := ret[0].(error) return ret0 } // UpdateAckedOffset indicates an expected call of UpdateAckedOffset. -func (mr *MockoffsetStoreMockRecorder) UpdateAckedOffset(ctx, topic, partitionKey, offset any) *gomock.Call { +func (mr *MockoffsetStoreMockRecorder) UpdateAckedOffset(ctx, topic, partitionKey, offset, consumerGroup any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAckedOffset", reflect.TypeOf((*MockoffsetStore)(nil).UpdateAckedOffset), ctx, topic, partitionKey, offset) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAckedOffset", reflect.TypeOf((*MockoffsetStore)(nil).UpdateAckedOffset), ctx, topic, partitionKey, offset, consumerGroup) } // MockpartitionLeaseStore is a mock of partitionLeaseStore interface. @@ -218,74 +218,74 @@ func (m *MockpartitionLeaseStore) EXPECT() *MockpartitionLeaseStoreMockRecorder } // DiscoverAndAcquirePartitions mocks base method. -func (m *MockpartitionLeaseStore) DiscoverAndAcquirePartitions(ctx context.Context, topic string) (int, error) { +func (m *MockpartitionLeaseStore) DiscoverAndAcquirePartitions(ctx context.Context, topic, subscriberName, consumerGroup string, leaseDurationMs int64) (int, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DiscoverAndAcquirePartitions", ctx, topic) + ret := m.ctrl.Call(m, "DiscoverAndAcquirePartitions", ctx, topic, subscriberName, consumerGroup, leaseDurationMs) ret0, _ := ret[0].(int) ret1, _ := ret[1].(error) return ret0, ret1 } // DiscoverAndAcquirePartitions indicates an expected call of DiscoverAndAcquirePartitions. -func (mr *MockpartitionLeaseStoreMockRecorder) DiscoverAndAcquirePartitions(ctx, topic any) *gomock.Call { +func (mr *MockpartitionLeaseStoreMockRecorder) DiscoverAndAcquirePartitions(ctx, topic, subscriberName, consumerGroup, leaseDurationMs any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiscoverAndAcquirePartitions", reflect.TypeOf((*MockpartitionLeaseStore)(nil).DiscoverAndAcquirePartitions), ctx, topic) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiscoverAndAcquirePartitions", reflect.TypeOf((*MockpartitionLeaseStore)(nil).DiscoverAndAcquirePartitions), ctx, topic, subscriberName, consumerGroup, leaseDurationMs) } // GetLeasedPartitions mocks base method. -func (m *MockpartitionLeaseStore) GetLeasedPartitions(ctx context.Context, topic string) ([]string, error) { +func (m *MockpartitionLeaseStore) GetLeasedPartitions(ctx context.Context, topic, subscriberName, consumerGroup string) ([]string, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetLeasedPartitions", ctx, topic) + ret := m.ctrl.Call(m, "GetLeasedPartitions", ctx, topic, subscriberName, consumerGroup) ret0, _ := ret[0].([]string) ret1, _ := ret[1].(error) return ret0, ret1 } // GetLeasedPartitions indicates an expected call of GetLeasedPartitions. -func (mr *MockpartitionLeaseStoreMockRecorder) GetLeasedPartitions(ctx, topic any) *gomock.Call { +func (mr *MockpartitionLeaseStoreMockRecorder) GetLeasedPartitions(ctx, topic, subscriberName, consumerGroup any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLeasedPartitions", reflect.TypeOf((*MockpartitionLeaseStore)(nil).GetLeasedPartitions), ctx, topic) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLeasedPartitions", reflect.TypeOf((*MockpartitionLeaseStore)(nil).GetLeasedPartitions), ctx, topic, subscriberName, consumerGroup) } // ReleaseLease mocks base method. -func (m *MockpartitionLeaseStore) ReleaseLease(ctx context.Context, topic, partitionKey string) error { +func (m *MockpartitionLeaseStore) ReleaseLease(ctx context.Context, topic, partitionKey, subscriberName, consumerGroup string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReleaseLease", ctx, topic, partitionKey) + ret := m.ctrl.Call(m, "ReleaseLease", ctx, topic, partitionKey, subscriberName, consumerGroup) ret0, _ := ret[0].(error) return ret0 } // ReleaseLease indicates an expected call of ReleaseLease. -func (mr *MockpartitionLeaseStoreMockRecorder) ReleaseLease(ctx, topic, partitionKey any) *gomock.Call { +func (mr *MockpartitionLeaseStoreMockRecorder) ReleaseLease(ctx, topic, partitionKey, subscriberName, consumerGroup any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReleaseLease", reflect.TypeOf((*MockpartitionLeaseStore)(nil).ReleaseLease), ctx, topic, partitionKey) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReleaseLease", reflect.TypeOf((*MockpartitionLeaseStore)(nil).ReleaseLease), ctx, topic, partitionKey, subscriberName, consumerGroup) } // RenewLease mocks base method. -func (m *MockpartitionLeaseStore) RenewLease(ctx context.Context, topic, partitionKey string) error { +func (m *MockpartitionLeaseStore) RenewLease(ctx context.Context, topic, partitionKey, subscriberName, consumerGroup string, leaseDurationMs int64) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RenewLease", ctx, topic, partitionKey) + ret := m.ctrl.Call(m, "RenewLease", ctx, topic, partitionKey, subscriberName, consumerGroup, leaseDurationMs) ret0, _ := ret[0].(error) return ret0 } // RenewLease indicates an expected call of RenewLease. -func (mr *MockpartitionLeaseStoreMockRecorder) RenewLease(ctx, topic, partitionKey any) *gomock.Call { +func (mr *MockpartitionLeaseStoreMockRecorder) RenewLease(ctx, topic, partitionKey, subscriberName, consumerGroup, leaseDurationMs any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RenewLease", reflect.TypeOf((*MockpartitionLeaseStore)(nil).RenewLease), ctx, topic, partitionKey) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RenewLease", reflect.TypeOf((*MockpartitionLeaseStore)(nil).RenewLease), ctx, topic, partitionKey, subscriberName, consumerGroup, leaseDurationMs) } // TryAcquireLease mocks base method. -func (m *MockpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic, partitionKey string) (bool, error) { +func (m *MockpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic, partitionKey, subscriberName, consumerGroup string, leaseDurationMs int64) (bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TryAcquireLease", ctx, topic, partitionKey) + ret := m.ctrl.Call(m, "TryAcquireLease", ctx, topic, partitionKey, subscriberName, consumerGroup, leaseDurationMs) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(error) return ret0, ret1 } // TryAcquireLease indicates an expected call of TryAcquireLease. -func (mr *MockpartitionLeaseStoreMockRecorder) TryAcquireLease(ctx, topic, partitionKey any) *gomock.Call { +func (mr *MockpartitionLeaseStoreMockRecorder) TryAcquireLease(ctx, topic, partitionKey, subscriberName, consumerGroup, leaseDurationMs any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryAcquireLease", reflect.TypeOf((*MockpartitionLeaseStore)(nil).TryAcquireLease), ctx, topic, partitionKey) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryAcquireLease", reflect.TypeOf((*MockpartitionLeaseStore)(nil).TryAcquireLease), ctx, topic, partitionKey, subscriberName, consumerGroup, leaseDurationMs) } diff --git a/extension/queue/sql/offset_store.go b/extension/queue/sql/offset_store.go index 33c86930..f0daeedf 100644 --- a/extension/queue/sql/offset_store.go +++ b/extension/queue/sql/offset_store.go @@ -14,7 +14,6 @@ import ( // sqloffsetStore is the SQL implementation of offsetStore type sqloffsetStore struct { db *sql.DB - config Config logger *zap.SugaredLogger metrics tally.Scope } @@ -25,34 +24,33 @@ const ( ) // newOffsetStore creates a new SQL offset store -func newOffsetStore(db *sql.DB, config Config, logger *zap.Logger, metrics tally.Scope) offsetStore { +func newOffsetStore(db *sql.DB, logger *zap.Logger, metrics tally.Scope) offsetStore { return &sqloffsetStore{ db: db, - config: config, logger: logger.Sugar().Named("offset_store"), metrics: metrics.SubScope("offset_store"), } } // Initialize creates an offset entry for a topic+partition if it doesn't exist -func (s *sqloffsetStore) Initialize(ctx context.Context, topic string, partitionKey string) error { +func (s *sqloffsetStore) Initialize(ctx context.Context, topic string, partitionKey string, consumerGroup string) error { now := time.Now().UnixMilli() // Try to insert, ignore if already exists _, err := s.db.ExecContext(ctx, fmt.Sprintf(` INSERT IGNORE INTO %s (consumer_group, topic, partition_key, offset_acked, updated_at) VALUES (?, ?, ?, 0, ?) - `, OffsetsTableName), s.config.ConsumerGroup, topic, partitionKey, now) + `, OffsetsTableName), consumerGroup, topic, partitionKey, now) return err } // GetAckedOffset returns the current acked offset for a topic+partition -func (s *sqloffsetStore) GetAckedOffset(ctx context.Context, topic string, partitionKey string) (int64, error) { +func (s *sqloffsetStore) GetAckedOffset(ctx context.Context, topic string, partitionKey string, consumerGroup string) (int64, error) { var offset int64 err := s.db.QueryRowContext(ctx, fmt.Sprintf(` SELECT offset_acked FROM %s WHERE consumer_group = ? AND topic = ? AND partition_key = ? - `, OffsetsTableName), s.config.ConsumerGroup, topic, partitionKey).Scan(&offset) + `, OffsetsTableName), consumerGroup, topic, partitionKey).Scan(&offset) if err == sql.ErrNoRows { // Partition not yet initialized, return 0 @@ -67,20 +65,20 @@ func (s *sqloffsetStore) GetAckedOffset(ctx context.Context, topic string, parti } // UpdateAckedOffset updates the offset_acked for a topic+partition (only if new offset is greater) -func (s *sqloffsetStore) UpdateAckedOffset(ctx context.Context, topic string, partitionKey string, offset int64) error { +func (s *sqloffsetStore) UpdateAckedOffset(ctx context.Context, topic string, partitionKey string, offset int64, consumerGroup string) error { now := time.Now().UnixMilli() _, err := s.db.ExecContext(ctx, fmt.Sprintf(` UPDATE %s SET offset_acked = ?, updated_at = ? WHERE consumer_group = ? AND topic = ? AND partition_key = ? AND offset_acked < ? - `, OffsetsTableName), offset, now, s.config.ConsumerGroup, topic, partitionKey, offset) + `, OffsetsTableName), offset, now, consumerGroup, topic, partitionKey, offset) return err } // AckMessage atomically deletes a message and updates the acked offset -func (s *sqloffsetStore) AckMessage(ctx context.Context, topic string, partitionKey string, messageID string, offset int64, messageStore messageStore) error { +func (s *sqloffsetStore) AckMessage(ctx context.Context, topic string, partitionKey string, messageID string, offset int64, consumerGroup string, messageStore messageStore) error { start := time.Now() success := false defer func() { @@ -116,7 +114,7 @@ func (s *sqloffsetStore) AckMessage(ctx context.Context, topic string, partition ON DUPLICATE KEY UPDATE offset_acked = IF(VALUES(offset_acked) > offset_acked, VALUES(offset_acked), offset_acked), updated_at = VALUES(updated_at) - `, OffsetsTableName), s.config.ConsumerGroup, topic, partitionKey, offset, now) + `, OffsetsTableName), consumerGroup, topic, partitionKey, offset, now) if err != nil { s.metrics.Tagged(map[string]string{tagErrorType: "update_offset"}).Counter(metricAckMessageErrors).Inc(1) return fmt.Errorf("failed to update offset: %w", err) diff --git a/extension/queue/sql/offset_store_test.go b/extension/queue/sql/offset_store_test.go index 300720dc..9cba032d 100644 --- a/extension/queue/sql/offset_store_test.go +++ b/extension/queue/sql/offset_store_test.go @@ -10,14 +10,18 @@ import ( "go.uber.org/zap/zaptest" ) +const ( + testConsumerGroup = "test-consumer" + testSubscriberName = "test-subscriber" +) + func setupoffsetStoreTest(t *testing.T) (*sql.DB, sqlmock.Sqlmock, offsetStore) { t.Helper() db, mock, err := sqlmock.New() require.NoError(t, err) - config := DefaultConfig("test-consumer", "test-worker") - store := newOffsetStore(db, config, zaptest.NewLogger(t), testMetrics()) + store := newOffsetStore(db, zaptest.NewLogger(t), testMetrics()) return db, mock, store } @@ -31,10 +35,10 @@ func TestoffsetStore_Initialize(t *testing.T) { partitionKey := "part1" mock.ExpectExec("INSERT IGNORE INTO queue_offsets"). - WithArgs("test-consumer", topic, partitionKey, sqlmock.AnyArg()). + WithArgs(testConsumerGroup, topic, partitionKey, sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(1, 1)) - err := store.Initialize(ctx, topic, partitionKey) + err := store.Initialize(ctx, topic, partitionKey, testConsumerGroup) require.NoError(t, err) require.NoError(t, mock.ExpectationsWereMet()) } @@ -51,7 +55,7 @@ func TestoffsetStore_GetAckedOffset(t *testing.T) { setup: func(mock sqlmock.Sqlmock) { rows := sqlmock.NewRows([]string{"offset_acked"}).AddRow(int64(100)) mock.ExpectQuery("SELECT offset_acked FROM queue_offsets"). - WithArgs("test-consumer", "test_topic", "part1"). + WithArgs(testConsumerGroup, "test_topic", "part1"). WillReturnRows(rows) }, expectedOffset: 100, @@ -61,7 +65,7 @@ func TestoffsetStore_GetAckedOffset(t *testing.T) { name: "offset not found returns zero", setup: func(mock sqlmock.Sqlmock) { mock.ExpectQuery("SELECT offset_acked FROM queue_offsets"). - WithArgs("test-consumer", "test_topic", "part1"). + WithArgs(testConsumerGroup, "test_topic", "part1"). WillReturnError(sql.ErrNoRows) }, expectedOffset: 0, @@ -80,7 +84,7 @@ func TestoffsetStore_GetAckedOffset(t *testing.T) { tt.setup(mock) - offset, err := store.GetAckedOffset(ctx, topic, partitionKey) + offset, err := store.GetAckedOffset(ctx, topic, partitionKey, testConsumerGroup) if tt.wantErr { require.Error(t, err) } else { @@ -102,10 +106,10 @@ func TestoffsetStore_UpdateAckedOffset(t *testing.T) { offset := int64(150) mock.ExpectExec("UPDATE queue_offsets"). - WithArgs(offset, sqlmock.AnyArg(), "test-consumer", topic, partitionKey, offset). + WithArgs(offset, sqlmock.AnyArg(), testConsumerGroup, topic, partitionKey, offset). WillReturnResult(sqlmock.NewResult(0, 1)) - err := store.UpdateAckedOffset(ctx, topic, partitionKey, offset) + err := store.UpdateAckedOffset(ctx, topic, partitionKey, offset, testConsumerGroup) require.NoError(t, err) require.NoError(t, mock.ExpectationsWereMet()) } @@ -124,7 +128,7 @@ func TestoffsetStore_AckMessage(t *testing.T) { WithArgs("test_topic", "part1", "msg1"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectExec("INSERT INTO queue_offsets"). - WithArgs("test-consumer", "test_topic", "part1", int64(100), sqlmock.AnyArg()). + WithArgs(testConsumerGroup, "test_topic", "part1", int64(100), sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() }, @@ -156,7 +160,7 @@ func TestoffsetStore_AckMessage(t *testing.T) { tt.setup(mock) - err := store.AckMessage(ctx, topic, partitionKey, messageID, offset, nil) + err := store.AckMessage(ctx, topic, partitionKey, messageID, offset, testConsumerGroup, nil) if tt.wantErr { require.Error(t, err) } else { diff --git a/extension/queue/sql/partition_lease_store.go b/extension/queue/sql/partition_lease_store.go index 08c11a3a..7c9097e5 100644 --- a/extension/queue/sql/partition_lease_store.go +++ b/extension/queue/sql/partition_lease_store.go @@ -14,7 +14,6 @@ import ( // sqlpartitionLeaseStore is the SQL implementation of partitionLeaseStore type sqlpartitionLeaseStore struct { db *sql.DB - config Config logger *zap.SugaredLogger metrics tally.Scope } @@ -28,17 +27,16 @@ const ( ) // newPartitionLeaseStore creates a new SQL partition lease store -func newPartitionLeaseStore(db *sql.DB, config Config, logger *zap.Logger, metrics tally.Scope) partitionLeaseStore { +func newPartitionLeaseStore(db *sql.DB, logger *zap.Logger, metrics tally.Scope) partitionLeaseStore { return &sqlpartitionLeaseStore{ db: db, - config: config, logger: logger.Sugar().Named("partition_lease_store"), metrics: metrics.SubScope("partition_lease_store"), } } // TryAcquireLease attempts to acquire or renew a lease for a partition -func (s *sqlpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic string, partitionKey string) (bool, error) { +func (s *sqlpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic string, partitionKey string, subscriberName string, consumerGroup string, leaseDurationMs int64) (bool, error) { start := time.Now() success := false defer func() { @@ -50,7 +48,7 @@ func (s *sqlpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic stri }() now := start.UnixMilli() - staleThreshold := now - s.config.LeaseDuration.Milliseconds() + staleThreshold := now - leaseDurationMs // Try to insert or update stale lease _, err := s.db.ExecContext(ctx, fmt.Sprintf(` @@ -61,7 +59,7 @@ func (s *sqlpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic stri leased_at = IF(lease_renewed_at < ?, VALUES(leased_at), leased_at), lease_renewed_at = IF(lease_renewed_at < ?, VALUES(lease_renewed_at), lease_renewed_at) `, PartitionLeasesTableName), - s.config.ConsumerGroup, topic, partitionKey, s.config.WorkerID, now, now, + consumerGroup, topic, partitionKey, subscriberName, now, now, staleThreshold, staleThreshold, staleThreshold) if err != nil { @@ -79,7 +77,7 @@ func (s *sqlpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic stri err = s.db.QueryRowContext(ctx, fmt.Sprintf(` SELECT leased_by FROM %s WHERE consumer_group = ? AND topic = ? AND partition_key = ? - `, PartitionLeasesTableName), s.config.ConsumerGroup, topic, partitionKey).Scan(&owner) + `, PartitionLeasesTableName), consumerGroup, topic, partitionKey).Scan(&owner) if err != nil { s.logger.Errorw("failed to check lease ownership", @@ -91,7 +89,7 @@ func (s *sqlpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic stri return false, fmt.Errorf("failed to check lease ownership: %w", err) } - acquired := owner == s.config.WorkerID + acquired := owner == subscriberName if acquired { s.metrics.Counter("try_acquire_lease.acquired").Inc(1) s.logger.Debugw("acquired lease", @@ -107,7 +105,7 @@ func (s *sqlpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic stri } // RenewLease renews the lease for a partition owned by this worker -func (s *sqlpartitionLeaseStore) RenewLease(ctx context.Context, topic string, partitionKey string) error { +func (s *sqlpartitionLeaseStore) RenewLease(ctx context.Context, topic string, partitionKey string, subscriberName string, consumerGroup string, leaseDurationMs int64) error { start := time.Now() success := false defer func() { @@ -124,7 +122,7 @@ func (s *sqlpartitionLeaseStore) RenewLease(ctx context.Context, topic string, p UPDATE %s SET lease_renewed_at = ? WHERE consumer_group = ? AND topic = ? AND partition_key = ? AND leased_by = ? - `, PartitionLeasesTableName), now, s.config.ConsumerGroup, topic, partitionKey, s.config.WorkerID) + `, PartitionLeasesTableName), now, consumerGroup, topic, partitionKey, subscriberName) if err != nil { s.logger.Errorw("failed to renew lease", @@ -168,7 +166,7 @@ func (s *sqlpartitionLeaseStore) RenewLease(ctx context.Context, topic string, p } // ReleaseLease releases the lease for a partition owned by this worker -func (s *sqlpartitionLeaseStore) ReleaseLease(ctx context.Context, topic string, partitionKey string) error { +func (s *sqlpartitionLeaseStore) ReleaseLease(ctx context.Context, topic string, partitionKey string, subscriberName string, consumerGroup string) error { start := time.Now() success := false defer func() { @@ -182,7 +180,7 @@ func (s *sqlpartitionLeaseStore) ReleaseLease(ctx context.Context, topic string, result, err := s.db.ExecContext(ctx, fmt.Sprintf(` DELETE FROM %s WHERE consumer_group = ? AND topic = ? AND partition_key = ? AND leased_by = ? - `, PartitionLeasesTableName), s.config.ConsumerGroup, topic, partitionKey, s.config.WorkerID) + `, PartitionLeasesTableName), consumerGroup, topic, partitionKey, subscriberName) if err != nil { s.logger.Errorw("failed to release lease", @@ -210,7 +208,7 @@ func (s *sqlpartitionLeaseStore) ReleaseLease(ctx context.Context, topic string, } // GetLeasedPartitions returns all partitions currently leased by this worker -func (s *sqlpartitionLeaseStore) GetLeasedPartitions(ctx context.Context, topic string) ([]string, error) { +func (s *sqlpartitionLeaseStore) GetLeasedPartitions(ctx context.Context, topic string, subscriberName string, consumerGroup string) ([]string, error) { start := time.Now() success := false defer func() { @@ -224,7 +222,7 @@ func (s *sqlpartitionLeaseStore) GetLeasedPartitions(ctx context.Context, topic rows, err := s.db.QueryContext(ctx, fmt.Sprintf(` SELECT partition_key FROM %s WHERE consumer_group = ? AND topic = ? AND leased_by = ? - `, PartitionLeasesTableName), s.config.ConsumerGroup, topic, s.config.WorkerID) + `, PartitionLeasesTableName), consumerGroup, topic, subscriberName) if err != nil { s.logger.Errorw("failed to get leased partitions", @@ -264,7 +262,7 @@ func (s *sqlpartitionLeaseStore) GetLeasedPartitions(ctx context.Context, topic // DiscoverAndAcquirePartitions discovers partitions from messages table and tries to acquire leases // Returns the number of new leases acquired -func (s *sqlpartitionLeaseStore) DiscoverAndAcquirePartitions(ctx context.Context, topic string) (int, error) { +func (s *sqlpartitionLeaseStore) DiscoverAndAcquirePartitions(ctx context.Context, topic string, subscriberName string, consumerGroup string, leaseDurationMs int64) (int, error) { start := time.Now() success := false defer func() { @@ -313,7 +311,7 @@ func (s *sqlpartitionLeaseStore) DiscoverAndAcquirePartitions(ctx context.Contex // Try to acquire leases for discovered partitions acquiredCount := 0 for _, partitionKey := range partitions { - acquired, err := s.TryAcquireLease(ctx, topic, partitionKey) + acquired, err := s.TryAcquireLease(ctx, topic, partitionKey, subscriberName, consumerGroup, leaseDurationMs) if err != nil { // Log but continue trying other partitions s.logger.Warnw("failed to acquire lease for partition", diff --git a/extension/queue/sql/partition_lease_store_test.go b/extension/queue/sql/partition_lease_store_test.go index 93b62d93..6142b3d9 100644 --- a/extension/queue/sql/partition_lease_store_test.go +++ b/extension/queue/sql/partition_lease_store_test.go @@ -11,14 +11,15 @@ import ( "go.uber.org/zap/zaptest" ) +const testLeaseDurationMs = 30000 // 30 seconds in milliseconds + func setuppartitionLeaseStoreTest(t *testing.T) (*sql.DB, sqlmock.Sqlmock, partitionLeaseStore) { t.Helper() db, mock, err := sqlmock.New() require.NoError(t, err) - config := DefaultConfig("test-consumer", "test-worker") - store := newPartitionLeaseStore(db, config, zaptest.NewLogger(t), tally.NoopScope) + store := newPartitionLeaseStore(db, zaptest.NewLogger(t), tally.NoopScope) return db, mock, store } @@ -34,11 +35,11 @@ func TestpartitionLeaseStore_TryAcquireLease(t *testing.T) { name: "successfully acquire lease", setup: func(mock sqlmock.Sqlmock) { mock.ExpectExec("INSERT INTO queue_partition_leases"). - WithArgs("test-consumer", "test_topic", "part1", "test-worker", sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()). + WithArgs(testConsumerGroup, "test_topic", "part1", testSubscriberName, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(1, 1)) - rows := sqlmock.NewRows([]string{"leased_by"}).AddRow("test-worker") + rows := sqlmock.NewRows([]string{"leased_by"}).AddRow(testSubscriberName) mock.ExpectQuery("SELECT leased_by FROM queue_partition_leases"). - WithArgs("test-consumer", "test_topic", "part1"). + WithArgs(testConsumerGroup, "test_topic", "part1"). WillReturnRows(rows) }, acquired: true, @@ -51,7 +52,7 @@ func TestpartitionLeaseStore_TryAcquireLease(t *testing.T) { WillReturnResult(sqlmock.NewResult(1, 1)) rows := sqlmock.NewRows([]string{"leased_by"}).AddRow("other-worker") mock.ExpectQuery("SELECT leased_by FROM queue_partition_leases"). - WithArgs("test-consumer", "test_topic", "part1"). + WithArgs(testConsumerGroup, "test_topic", "part1"). WillReturnRows(rows) }, acquired: false, @@ -70,7 +71,7 @@ func TestpartitionLeaseStore_TryAcquireLease(t *testing.T) { tt.setup(mock) - acquired, err := store.TryAcquireLease(ctx, topic, partitionKey) + acquired, err := store.TryAcquireLease(ctx, topic, partitionKey, testSubscriberName, testConsumerGroup, testLeaseDurationMs) if tt.wantErr { require.Error(t, err) } else { @@ -92,7 +93,7 @@ func TestpartitionLeaseStore_RenewLease(t *testing.T) { name: "successfully renew lease", setup: func(mock sqlmock.Sqlmock) { mock.ExpectExec("UPDATE queue_partition_leases"). - WithArgs(sqlmock.AnyArg(), "test-consumer", "test_topic", "part1", "test-worker"). + WithArgs(sqlmock.AnyArg(), testConsumerGroup, "test_topic", "part1", testSubscriberName). WillReturnResult(sqlmock.NewResult(0, 1)) }, wantErr: false, @@ -101,7 +102,7 @@ func TestpartitionLeaseStore_RenewLease(t *testing.T) { name: "lease not owned", setup: func(mock sqlmock.Sqlmock) { mock.ExpectExec("UPDATE queue_partition_leases"). - WithArgs(sqlmock.AnyArg(), "test-consumer", "test_topic", "part1", "test-worker"). + WithArgs(sqlmock.AnyArg(), testConsumerGroup, "test_topic", "part1", testSubscriberName). WillReturnResult(sqlmock.NewResult(0, 0)) }, wantErr: true, @@ -119,7 +120,7 @@ func TestpartitionLeaseStore_RenewLease(t *testing.T) { tt.setup(mock) - err := store.RenewLease(ctx, topic, partitionKey) + err := store.RenewLease(ctx, topic, partitionKey, testSubscriberName, testConsumerGroup, testLeaseDurationMs) if tt.wantErr { require.Error(t, err) } else { @@ -140,7 +141,7 @@ func TestpartitionLeaseStore_ReleaseLease(t *testing.T) { name: "successfully release lease", setup: func(mock sqlmock.Sqlmock) { mock.ExpectExec("DELETE FROM queue_partition_leases"). - WithArgs("test-consumer", "test_topic", "part1", "test-worker"). + WithArgs(testConsumerGroup, "test_topic", "part1", testSubscriberName). WillReturnResult(sqlmock.NewResult(0, 1)) }, wantErr: false, @@ -149,7 +150,7 @@ func TestpartitionLeaseStore_ReleaseLease(t *testing.T) { name: "idempotent - already released", setup: func(mock sqlmock.Sqlmock) { mock.ExpectExec("DELETE FROM queue_partition_leases"). - WithArgs("test-consumer", "test_topic", "part1", "test-worker"). + WithArgs(testConsumerGroup, "test_topic", "part1", testSubscriberName). WillReturnResult(sqlmock.NewResult(0, 0)) }, wantErr: false, @@ -167,7 +168,7 @@ func TestpartitionLeaseStore_ReleaseLease(t *testing.T) { tt.setup(mock) - err := store.ReleaseLease(ctx, topic, partitionKey) + err := store.ReleaseLease(ctx, topic, partitionKey, testSubscriberName, testConsumerGroup) if tt.wantErr { require.Error(t, err) } else { @@ -191,10 +192,10 @@ func TestpartitionLeaseStore_GetLeasedPartitions(t *testing.T) { AddRow("part3") mock.ExpectQuery("SELECT partition_key FROM queue_partition_leases"). - WithArgs("test-consumer", topic, "test-worker"). + WithArgs(testConsumerGroup, topic, testSubscriberName). WillReturnRows(rows) - partitions, err := store.GetLeasedPartitions(ctx, topic) + partitions, err := store.GetLeasedPartitions(ctx, topic, testSubscriberName, testConsumerGroup) require.NoError(t, err) require.Len(t, partitions, 3) require.Equal(t, []string{"part1", "part2", "part3"}, partitions) @@ -224,7 +225,7 @@ func TestpartitionLeaseStore_DiscoverAndAcquirePartitions(t *testing.T) { WillReturnResult(sqlmock.NewResult(1, 1)) // Expect ownership check - first one acquired, second not - owner := "test-worker" + owner := testSubscriberName if i == 1 { owner = "other-worker" } @@ -233,7 +234,7 @@ func TestpartitionLeaseStore_DiscoverAndAcquirePartitions(t *testing.T) { WillReturnRows(ownerRows) } - acquired, err := store.DiscoverAndAcquirePartitions(ctx, topic) + acquired, err := store.DiscoverAndAcquirePartitions(ctx, topic, testSubscriberName, testConsumerGroup, testLeaseDurationMs) require.NoError(t, err) require.Equal(t, 1, acquired) // Only 1 out of 2 was acquired require.NoError(t, mock.ExpectationsWereMet()) diff --git a/extension/queue/sql/publisher.go b/extension/queue/sql/publisher.go index e75a592f..ac8d3a5c 100644 --- a/extension/queue/sql/publisher.go +++ b/extension/queue/sql/publisher.go @@ -12,7 +12,6 @@ import ( ) type publisher struct { - config Config logger *zap.SugaredLogger metrics tally.Scope messageStore messageStore @@ -20,10 +19,9 @@ type publisher struct { closed bool } -// NewPublisher creates a publisher with the given configuration and dependencies -func NewPublisher(config Config, logger *zap.SugaredLogger, metrics tally.Scope, messageStore messageStore) *publisher { +// NewPublisher creates a publisher with the given dependencies +func NewPublisher(logger *zap.SugaredLogger, metrics tally.Scope, messageStore messageStore) *publisher { return &publisher{ - config: config, logger: logger, metrics: metrics, messageStore: messageStore, diff --git a/extension/queue/sql/publisher_test.go b/extension/queue/sql/publisher_test.go index 7add4ba4..fb11a1ae 100644 --- a/extension/queue/sql/publisher_test.go +++ b/extension/queue/sql/publisher_test.go @@ -20,9 +20,7 @@ const fixedTimestamp = int64(1234567890000) // Fixed timestamp for test repeatab func setupPublisherTest(t *testing.T, mockStore *MockmessageStore) extqueue.Publisher { t.Helper() - config := DefaultConfig("test-consumer", "test-worker") - - return NewPublisher(config, + return NewPublisher( zaptest.NewLogger(t).Sugar().Named("publisher"), tally.NoopScope.SubScope("publisher"), mockStore, diff --git a/extension/queue/sql/sql.go b/extension/queue/sql/sql.go index 259556aa..274ea404 100644 --- a/extension/queue/sql/sql.go +++ b/extension/queue/sql/sql.go @@ -27,47 +27,33 @@ type Params struct { // MetricsScope for metrics collection (required) MetricsScope tally.Scope - - // Config holds queue configuration - Config Config } // NewQueue creates a new SQL-based queue func NewQueue(params Params) (queue.Queue, error) { - if err := params.Config.Validate(); err != nil { - return nil, fmt.Errorf("invalid config: %w", err) - } - // Test connection if err := params.DB.Ping(); err != nil { return nil, fmt.Errorf("failed to ping database: %w", err) } logger := params.Logger.Sugar().Named("queue.sql") - logger.Infow("created SQL queue", - "consumer_group", params.Config.ConsumerGroup, - "worker_id", params.Config.WorkerID, - "poll_interval", params.Config.PollInterval, - "batch_size", params.Config.BatchSize, - ) + logger.Info("created SQL queue") // Create stores - messageStore := newMessageStore(params.DB, params.Config, params.Logger, params.MetricsScope) - offsetStore := newOffsetStore(params.DB, params.Config, params.Logger, params.MetricsScope) - leaseStore := newPartitionLeaseStore(params.DB, params.Config, params.Logger, params.MetricsScope) + messageStore := newMessageStore(params.DB, params.Logger, params.MetricsScope) + offsetStore := newOffsetStore(params.DB, params.Logger, params.MetricsScope) + leaseStore := newPartitionLeaseStore(params.DB, params.Logger, params.MetricsScope) queueMetrics := params.MetricsScope.SubScope("queue") // Create publisher and subscriber publisher := NewPublisher( - params.Config, logger.Named("publisher"), queueMetrics.SubScope("publisher"), messageStore, ) subscriber := NewSubscriber( - params.Config, logger.Named("subscriber"), queueMetrics.SubScope("subscriber"), messageStore, diff --git a/extension/queue/sql/sql_test.go b/extension/queue/sql/sql_test.go index f1b4bb38..e165af93 100644 --- a/extension/queue/sql/sql_test.go +++ b/extension/queue/sql/sql_test.go @@ -25,7 +25,6 @@ func TestNewQueue(t *testing.T) { DB: db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NewTestScope("test", nil), - Config: DefaultConfig("test-consumer", "test-worker"), }) require.NoError(t, err) @@ -39,24 +38,6 @@ func TestNewQueue(t *testing.T) { require.NoError(t, mock.ExpectationsWereMet()) }) - t.Run("error when config is invalid", func(t *testing.T) { - db, _, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) - require.NoError(t, err) - defer db.Close() - - config := DefaultConfig("", "") // Invalid: empty consumer group and worker ID - - q, err := NewQueue(Params{ - DB: db, - Logger: zaptest.NewLogger(t), - MetricsScope: tally.NewTestScope("test", nil), - Config: config, - }) - - require.Error(t, err) - assert.Nil(t, q) - }) - t.Run("error when DB ping fails", func(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) require.NoError(t, err) @@ -68,7 +49,6 @@ func TestNewQueue(t *testing.T) { DB: db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NewTestScope("test", nil), - Config: DefaultConfig("test-consumer", "test-worker"), }) require.Error(t, err) @@ -89,7 +69,6 @@ func TestQueue_Publisher(t *testing.T) { DB: db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NewTestScope("test", nil), - Config: DefaultConfig("test-consumer", "test-worker"), }) require.NoError(t, err) defer q.Close() @@ -116,7 +95,6 @@ func TestQueue_Subscriber(t *testing.T) { DB: db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NewTestScope("test", nil), - Config: DefaultConfig("test-consumer", "test-worker"), }) require.NoError(t, err) defer q.Close() @@ -179,7 +157,6 @@ func TestQueue_Close(t *testing.T) { DB: db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NewTestScope("test", nil), - Config: DefaultConfig("test-consumer", "test-worker"), }) require.NoError(t, err) @@ -209,7 +186,6 @@ func TestQueue_Close(t *testing.T) { DB: db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NewTestScope("test", nil), - Config: DefaultConfig("test-consumer", "test-worker"), }) require.NoError(t, err) @@ -234,7 +210,6 @@ func TestQueue_Close(t *testing.T) { DB: db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NewTestScope("test", nil), - Config: DefaultConfig("test-consumer", "test-worker"), }) require.NoError(t, err) @@ -264,13 +239,11 @@ func TestQueue_Integration(t *testing.T) { logger := zaptest.NewLogger(t) metricsScope := tally.NewTestScope("test", nil) - config := DefaultConfig("test-consumer", "test-worker") q, err := NewQueue(Params{ DB: db, Logger: logger, MetricsScope: metricsScope, - Config: config, }) require.NoError(t, err) defer q.Close() diff --git a/extension/queue/sql/stores.go b/extension/queue/sql/stores.go index aa9deb4c..5ab8be98 100644 --- a/extension/queue/sql/stores.go +++ b/extension/queue/sql/stores.go @@ -53,11 +53,12 @@ type messageStore interface { // FetchByOffset fetches messages with offset > currentOffset for a specific partition // Only fetches visible messages (invisible_until <= now) // Atomically sets invisible_until and increments retry_count - FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, limit int) ([]messageRow, error) + // visibilityTimeoutMs specifies how long messages should be invisible after fetching (in milliseconds) + FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, limit int, visibilityTimeoutMs int64) ([]messageRow, error) // MoveToDLQ moves a message to the dead letter queue - // The DLQ topic is automatically constructed from the original topic plus the configured DLQ suffix - MoveToDLQ(ctx context.Context, topic string, messageID string, failureCount int, lastError string) error + // dlqTopicSuffix is appended to the original topic to form the DLQ topic name + MoveToDLQ(ctx context.Context, topic string, messageID string, failureCount int, lastError string, dlqTopicSuffix string) error // SetVisibilityTimeout sets the invisible_until timestamp for a message // visibilityTimeoutMillis: milliseconds from now to hide the message @@ -69,34 +70,37 @@ type messageStore interface { // offsetStore handles offset table operations for per-partition offset tracking (internal use only) type offsetStore interface { // Initialize creates an offset entry for a topic+partition if it doesn't exist - Initialize(ctx context.Context, topic string, partitionKey string) error + Initialize(ctx context.Context, topic string, partitionKey string, consumerGroup string) error // GetAckedOffset returns the current acked offset for a topic+partition - GetAckedOffset(ctx context.Context, topic string, partitionKey string) (int64, error) + GetAckedOffset(ctx context.Context, topic string, partitionKey string, consumerGroup string) (int64, error) // UpdateAckedOffset updates the offset_acked for a topic+partition (only if new offset is greater) - UpdateAckedOffset(ctx context.Context, topic string, partitionKey string, offset int64) error + UpdateAckedOffset(ctx context.Context, topic string, partitionKey string, offset int64, consumerGroup string) error // AckMessage atomically deletes a message and updates the acked offset - AckMessage(ctx context.Context, topic string, partitionKey string, messageID string, offset int64, messageStore messageStore) error + AckMessage(ctx context.Context, topic string, partitionKey string, messageID string, offset int64, consumerGroup string, messageStore messageStore) error } // partitionLeaseStore handles partition lease operations (internal use only) type partitionLeaseStore interface { // TryAcquireLease attempts to acquire or renew a lease for a partition // Returns true if lease is acquired/owned by this worker - TryAcquireLease(ctx context.Context, topic string, partitionKey string) (bool, error) + // leaseDurationMs is how long the lease is valid (in milliseconds) + TryAcquireLease(ctx context.Context, topic string, partitionKey string, subscriberName string, consumerGroup string, leaseDurationMs int64) (bool, error) // RenewLease renews the lease for a partition owned by this worker - RenewLease(ctx context.Context, topic string, partitionKey string) error + // leaseDurationMs is how long the lease is valid (in milliseconds) + RenewLease(ctx context.Context, topic string, partitionKey string, subscriberName string, consumerGroup string, leaseDurationMs int64) error // ReleaseLease releases the lease for a partition owned by this worker - ReleaseLease(ctx context.Context, topic string, partitionKey string) error + ReleaseLease(ctx context.Context, topic string, partitionKey string, subscriberName string, consumerGroup string) error // GetLeasedPartitions returns all partitions currently leased by this worker - GetLeasedPartitions(ctx context.Context, topic string) ([]string, error) + GetLeasedPartitions(ctx context.Context, topic string, subscriberName string, consumerGroup string) ([]string, error) // DiscoverAndAcquirePartitions discovers partitions from messages table and tries to acquire leases // Returns the number of new leases acquired - DiscoverAndAcquirePartitions(ctx context.Context, topic string) (int, error) + // leaseDurationMs is how long the lease is valid (in milliseconds) + DiscoverAndAcquirePartitions(ctx context.Context, topic string, subscriberName string, consumerGroup string, leaseDurationMs int64) (int, error) } diff --git a/extension/queue/sql/subscriber.go b/extension/queue/sql/subscriber.go index eaf7b042..7ab82221 100644 --- a/extension/queue/sql/subscriber.go +++ b/extension/queue/sql/subscriber.go @@ -15,7 +15,6 @@ import ( ) type subscriber struct { - config Config logger *zap.SugaredLogger metrics tally.Scope messageStore messageStore @@ -31,6 +30,7 @@ type subscriber struct { type subscription struct { topic string + config extqueue.SubscriptionConfig deliveryCh chan extqueue.Delivery cancelFunc context.CancelFunc wg sync.WaitGroup @@ -45,11 +45,12 @@ type sqlDelivery struct { metadata map[string]string // Backend-specific fields for ack/nack - subscriber *subscriber - topic string - partitionKey string - offset int64 - messageID string + subscriber *subscriber + topic string + partitionKey string + offset int64 + messageID string + consumerGroup string // Track acknowledgment state mu sync.Mutex @@ -66,19 +67,21 @@ func newSQLDelivery( partitionKey string, offset int64, messageID string, + consumerGroup string, ) *sqlDelivery { return &sqlDelivery{ - msg: msg, - deliveryID: deliveryID, - attempt: attempt, - receivedAt: time.Now().UnixMilli(), - metadata: metadata, - subscriber: subscriber, - topic: topic, - partitionKey: partitionKey, - offset: offset, - messageID: messageID, - acknowledged: false, + msg: msg, + deliveryID: deliveryID, + attempt: attempt, + receivedAt: time.Now().UnixMilli(), + metadata: metadata, + subscriber: subscriber, + topic: topic, + partitionKey: partitionKey, + offset: offset, + messageID: messageID, + consumerGroup: consumerGroup, + acknowledged: false, } } @@ -117,7 +120,7 @@ func (d *sqlDelivery) Ack(ctx context.Context) error { } // Perform acknowledgment - if err := d.subscriber.offsetStore.AckMessage(ctx, d.topic, d.partitionKey, d.messageID, d.offset, d.subscriber.messageStore); err != nil { + if err := d.subscriber.offsetStore.AckMessage(ctx, d.topic, d.partitionKey, d.messageID, d.offset, d.consumerGroup, d.subscriber.messageStore); err != nil { return err } @@ -190,18 +193,10 @@ func (d *sqlDelivery) ExtendVisibilityTimeout(ctx context.Context, durationMilli return nil } -func NewSubscriber(config Config, logger *zap.SugaredLogger, metrics tally.Scope, messageStore messageStore, offsetStore offsetStore, leaseStore partitionLeaseStore) *subscriber { - logger.Infow("created subscriber", - "consumer_group", config.ConsumerGroup, - "worker_id", config.WorkerID, - "poll_interval", config.PollInterval, - "batch_size", config.BatchSize, - "max_retry_attempts", config.Retry.MaxAttempts, - "lease_renewal_interval", config.LeaseRenewalInterval, - ) +func NewSubscriber(logger *zap.SugaredLogger, metrics tally.Scope, messageStore messageStore, offsetStore offsetStore, leaseStore partitionLeaseStore) *subscriber { + logger.Info("created subscriber") return &subscriber{ - config: config, logger: logger, metrics: metrics, messageStore: messageStore, @@ -212,7 +207,7 @@ func NewSubscriber(config Config, logger *zap.SugaredLogger, metrics tally.Scope } // Subscribe starts consuming messages from the specified topic -func (s *subscriber) Subscribe(ctx context.Context, topic string) (<-chan extqueue.Delivery, error) { +func (s *subscriber) Subscribe(ctx context.Context, topic string, config extqueue.SubscriptionConfig) (<-chan extqueue.Delivery, error) { s.mu.RLock() closed := s.closed s.mu.RUnlock() @@ -228,16 +223,25 @@ func (s *subscriber) Subscribe(ctx context.Context, topic string) (<-chan extque return nil, fmt.Errorf("subscribe failure: invalid topic name. err: %w", err) } + // Create subscription key (topic + consumer group must be unique) + subKey := topic + ":" + config.ConsumerGroup + s.subMu.Lock() defer s.subMu.Unlock() // Check if already subscribed - if sub, exists := s.subscriptions[topic]; exists { - s.logger.Debugw("reusing existing subscription", "topic", topic) + if sub, exists := s.subscriptions[subKey]; exists { + s.logger.Debugw("reusing existing subscription", "topic", topic, "consumer_group", config.ConsumerGroup) return sub.deliveryCh, nil } - s.logger.Infow("creating new subscription", "topic", topic) + s.logger.Infow("creating new subscription", + "topic", topic, + "consumer_group", config.ConsumerGroup, + "subscriber_name", config.SubscriberName, + "poll_interval_ms", config.PollIntervalMs, + "batch_size", config.BatchSize, + ) // Create new subscription // Use a cancellable context for managing the subscription lifecycle @@ -245,11 +249,12 @@ func (s *subscriber) Subscribe(ctx context.Context, topic string) (<-chan extque subCtx, cancel := context.WithCancel(context.Background()) sub := &subscription{ topic: topic, - deliveryCh: make(chan extqueue.Delivery, s.config.BatchSize*2), + config: config, + deliveryCh: make(chan extqueue.Delivery, config.BatchSize*2), cancelFunc: cancel, } - s.subscriptions[topic] = sub + s.subscriptions[subKey] = sub // Track active subscription s.metrics.Tagged(map[string]string{"topic": topic}).Gauge("active_subscriptions").Update(1) @@ -258,7 +263,7 @@ func (s *subscriber) Subscribe(ctx context.Context, topic string) (<-chan extque sub.wg.Add(1) go s.managePartitions(subCtx, sub) - s.logger.Debugw("subscription created", "topic", topic, "consumer_group", s.config.ConsumerGroup, "worker_id", s.config.WorkerID) + s.logger.Debugw("subscription created", "topic", topic, "consumer_group", config.ConsumerGroup, "subscriber_name", config.SubscriberName) return sub.deliveryCh, nil } @@ -267,22 +272,26 @@ func (s *subscriber) managePartitions(ctx context.Context, sub *subscription) { defer sub.wg.Done() defer close(sub.deliveryCh) - pollTicker := time.NewTicker(s.config.PollInterval) + pollTicker := time.NewTicker(time.Duration(sub.config.PollIntervalMs) * time.Millisecond) defer pollTicker.Stop() - leaseTicker := time.NewTicker(s.config.LeaseRenewalInterval) + leaseTicker := time.NewTicker(time.Duration(sub.config.LeaseRenewalIntervalMs) * time.Millisecond) defer leaseTicker.Stop() for { select { case <-ctx.Done(): - // Release all leases on shutdown - s.releaseAllLeases(ctx, sub.topic) + // Release all leases on shutdown with a fresh context + // The passed context is already cancelled, so we create a new one with timeout + // to allow graceful lease release operations to complete + cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + s.releaseAllLeases(cleanupCtx, sub) return case <-leaseTicker.C: // Renew existing leases - s.renewLeases(ctx, sub.topic) + s.renewLeases(ctx, sub) case <-pollTicker.C: // Fetch and deliver messages from leased partitions @@ -292,24 +301,25 @@ func (s *subscriber) managePartitions(ctx context.Context, sub *subscription) { } // renewLeases renews leases for all partitions owned by this worker -func (s *subscriber) renewLeases(ctx context.Context, topic string) { - leasedPartitions, err := s.leaseStore.GetLeasedPartitions(ctx, topic) +func (s *subscriber) renewLeases(ctx context.Context, sub *subscription) { + cfg := sub.config + leasedPartitions, err := s.leaseStore.GetLeasedPartitions(ctx, sub.topic, cfg.SubscriberName, cfg.ConsumerGroup) if err != nil { s.logger.Errorw("failed to get leased partitions for renewal", - "topic", topic, + "topic", sub.topic, "error", err, ) // Error suppressed: lease renewal is best-effort. If we can't get leases, // they will eventually expire and be reacquired by this or another worker. // Failing the entire renewal cycle would be worse than skipping one iteration. - s.metrics.Tagged(map[string]string{"topic": topic}).Counter("lease_renewal.get_partitions_errors").Inc(1) + s.metrics.Tagged(map[string]string{"topic": sub.topic}).Counter("lease_renewal.get_partitions_errors").Inc(1) return } for _, partitionKey := range leasedPartitions { - if err := s.leaseStore.RenewLease(ctx, topic, partitionKey); err != nil { + if err := s.leaseStore.RenewLease(ctx, sub.topic, partitionKey, cfg.SubscriberName, cfg.ConsumerGroup, cfg.LeaseDurationMs); err != nil { s.logger.Warnw("failed to renew lease", - "topic", topic, + "topic", sub.topic, "partition_key", partitionKey, "error", err, ) @@ -317,7 +327,7 @@ func (s *subscriber) renewLeases(ctx context.Context, topic string) { // The partition will eventually expire and be re-acquired by this or another worker. // Failing fast would prevent other partitions from being renewed. s.metrics.Tagged(map[string]string{ - "topic": topic, + "topic": sub.topic, "partition_key": partitionKey, }).Counter("lease_renewal.renew_errors").Inc(1) } @@ -325,20 +335,21 @@ func (s *subscriber) renewLeases(ctx context.Context, topic string) { } // releaseAllLeases releases all leases for a topic -func (s *subscriber) releaseAllLeases(ctx context.Context, topic string) { - leasedPartitions, err := s.leaseStore.GetLeasedPartitions(ctx, topic) +func (s *subscriber) releaseAllLeases(ctx context.Context, sub *subscription) { + cfg := sub.config + leasedPartitions, err := s.leaseStore.GetLeasedPartitions(ctx, sub.topic, cfg.SubscriberName, cfg.ConsumerGroup) if err != nil { s.logger.Errorw("failed to get leased partitions for release", - "topic", topic, + "topic", sub.topic, "error", err, ) return } for _, partitionKey := range leasedPartitions { - if err := s.leaseStore.ReleaseLease(ctx, topic, partitionKey); err != nil { + if err := s.leaseStore.ReleaseLease(ctx, sub.topic, partitionKey, cfg.SubscriberName, cfg.ConsumerGroup); err != nil { s.logger.Warnw("failed to release lease", - "topic", topic, + "topic", sub.topic, "partition_key", partitionKey, "error", err, ) @@ -349,14 +360,15 @@ func (s *subscriber) releaseAllLeases(ctx context.Context, topic string) { // pollLeasedPartitions fetches and delivers messages from all leased partitions func (s *subscriber) pollLeasedPartitions(ctx context.Context, sub *subscription) { + cfg := sub.config // Discover and try to acquire leases for new partitions - acquiredCount, err := s.leaseStore.DiscoverAndAcquirePartitions(ctx, sub.topic) + acquiredCount, err := s.leaseStore.DiscoverAndAcquirePartitions(ctx, sub.topic, cfg.SubscriberName, cfg.ConsumerGroup, cfg.LeaseDurationMs) if err == nil && acquiredCount > 0 { s.metrics.Tagged(map[string]string{"topic": sub.topic}).Counter("leases_acquired").Inc(int64(acquiredCount)) } // Get currently leased partitions - leasedPartitions, err := s.leaseStore.GetLeasedPartitions(ctx, sub.topic) + leasedPartitions, err := s.leaseStore.GetLeasedPartitions(ctx, sub.topic, cfg.SubscriberName, cfg.ConsumerGroup) if err != nil { s.logger.Errorw("failed to get leased partitions", "topic", sub.topic, "error", err) return @@ -377,22 +389,23 @@ func (s *subscriber) pollLeasedPartitions(ctx context.Context, sub *subscription // fetchAndDeliverPartition fetches messages from a specific partition and delivers them func (s *subscriber) fetchAndDeliverPartition(ctx context.Context, sub *subscription, partitionKey string) { start := time.Now() + cfg := sub.config // Initialize offset for this partition if needed - if err := s.offsetStore.Initialize(ctx, sub.topic, partitionKey); err != nil { + if err := s.offsetStore.Initialize(ctx, sub.topic, partitionKey, cfg.ConsumerGroup); err != nil { s.logger.Errorw("offset initialization failure", "topic", sub.topic, "partition_key", partitionKey, "error", err) return } // Get current offset for this partition - currentOffset, err := s.offsetStore.GetAckedOffset(ctx, sub.topic, partitionKey) + currentOffset, err := s.offsetStore.GetAckedOffset(ctx, sub.topic, partitionKey, cfg.ConsumerGroup) if err != nil { s.logger.Errorw("get current offset failure", "topic", sub.topic, "partition_key", partitionKey, "error", err) return } // Fetch messages for this partition - rows, err := s.messageStore.FetchByOffset(ctx, sub.topic, partitionKey, currentOffset, s.config.BatchSize) + rows, err := s.messageStore.FetchByOffset(ctx, sub.topic, partitionKey, currentOffset, cfg.BatchSize, cfg.VisibilityTimeoutMs) if err != nil { s.logger.Errorw("fetch messages failure", "topic", sub.topic, "partition_key", partitionKey, "error", err) return @@ -401,7 +414,7 @@ func (s *subscriber) fetchAndDeliverPartition(ctx context.Context, sub *subscrip messageCount := 0 for _, row := range rows { // Check if message has exceeded retry limit (persistent retry_count from DB) - if row.RetryCount >= s.config.Retry.MaxAttempts { + if row.RetryCount >= cfg.Retry.MaxAttempts { s.logger.Warnw("message exceeded retry limit", "topic", sub.topic, "partition_key", partitionKey, @@ -410,9 +423,9 @@ func (s *subscriber) fetchAndDeliverPartition(ctx context.Context, sub *subscrip ) // Move to DLQ if enabled - if s.config.DLQ.Enabled { - dlqTopic := sub.topic + s.config.DLQ.TopicSuffix - if err := s.messageStore.MoveToDLQ(ctx, sub.topic, row.ID, row.RetryCount, "exceeded retry limit"); err != nil { + if cfg.DLQ.Enabled { + dlqTopic := sub.topic + cfg.DLQ.TopicSuffix + if err := s.messageStore.MoveToDLQ(ctx, sub.topic, row.ID, row.RetryCount, "exceeded retry limit", cfg.DLQ.TopicSuffix); err != nil { s.logger.Errorw("failed to move message to DLQ", "topic", sub.topic, "dlq_topic", dlqTopic, @@ -432,7 +445,7 @@ func (s *subscriber) fetchAndDeliverPartition(ctx context.Context, sub *subscrip }).Counter("messages_moved_to_dlq").Inc(1) // Update offset since message is now processed (moved to DLQ) - if err := s.offsetStore.UpdateAckedOffset(ctx, sub.topic, partitionKey, row.Offset); err != nil { + if err := s.offsetStore.UpdateAckedOffset(ctx, sub.topic, partitionKey, row.Offset, cfg.ConsumerGroup); err != nil { s.logger.Errorw("failed to update offset after DLQ move", "topic", sub.topic, "partition_key", partitionKey, @@ -491,6 +504,7 @@ func (s *subscriber) fetchAndDeliverPartition(ctx context.Context, sub *subscrip partitionKey, row.Offset, row.ID, + cfg.ConsumerGroup, ) // Deliver message diff --git a/extension/queue/sql/subscriber_test.go b/extension/queue/sql/subscriber_test.go index 1532db4d..d2c4eff6 100644 --- a/extension/queue/sql/subscriber_test.go +++ b/extension/queue/sql/subscriber_test.go @@ -13,12 +13,13 @@ import ( extqueue "github.com/uber/submitqueue/extension/queue" ) +func testSubscriptionConfig() extqueue.SubscriptionConfig { + return extqueue.DefaultSubscriptionConfig("test-subscriber", "test-consumer") +} + func setupSubscriberTest(t *testing.T, mockMessageStore *MockmessageStore, mockOffsetStore *MockoffsetStore, mockLeaseStore *MockpartitionLeaseStore) extqueue.Subscriber { t.Helper() - - config := DefaultConfig("test-consumer", "test-worker") - - return NewSubscriber(config, zaptest.NewLogger(t).Sugar().Named("subscriber"), tally.NoopScope.SubScope("subscriber"), mockMessageStore, mockOffsetStore, mockLeaseStore) + return NewSubscriber(zaptest.NewLogger(t).Sugar().Named("subscriber"), tally.NoopScope.SubScope("subscriber"), mockMessageStore, mockOffsetStore, mockLeaseStore) } func TestSubscriber_Subscribe(t *testing.T) { @@ -39,7 +40,7 @@ func TestSubscriber_Subscribe(t *testing.T) { expectedChans: 2, }, { - name: "same topic returns same channel", + name: "same topic and consumer group returns same channel", topics: []string{"test_topic", "test_topic"}, expectSame: true, expectedChans: 1, @@ -57,17 +58,18 @@ func TestSubscriber_Subscribe(t *testing.T) { sub := setupSubscriberTest(t, mockMessageStore, mockOffsetStore, mockLeaseStore) ctx := context.Background() + cfg := testSubscriptionConfig() var channels []<-chan extqueue.Delivery for _, topic := range tt.topics { - ch, err := sub.Subscribe(ctx, topic) + ch, err := sub.Subscribe(ctx, topic, cfg) require.NoError(t, err) assert.NotNil(t, ch) channels = append(channels, ch) } if tt.expectSame && len(channels) == 2 { - assert.Equal(t, channels[0], channels[1], "should return same channel for same topic") + assert.Equal(t, channels[0], channels[1], "should return same channel for same topic and consumer group") } }) } @@ -75,16 +77,16 @@ func TestSubscriber_Subscribe(t *testing.T) { func TestSubscriber_Close(t *testing.T) { tests := []struct { - name string - setupSub func(ctx context.Context, sub extqueue.Subscriber) error - closeCount int - subscribeAfter bool - expectSubError bool + name string + setupSub func(ctx context.Context, sub extqueue.Subscriber) error + closeCount int + subscribeAfter bool + expectSubError bool }{ { name: "close with active subscription", setupSub: func(ctx context.Context, sub extqueue.Subscriber) error { - _, err := sub.Subscribe(ctx, "test_topic") + _, err := sub.Subscribe(ctx, "test_topic", testSubscriptionConfig()) return err }, closeCount: 1, @@ -113,7 +115,7 @@ func TestSubscriber_Close(t *testing.T) { mockLeaseStore := NewMockpartitionLeaseStore(ctrl) // Expect lease operations during cleanup - mockLeaseStore.EXPECT().GetLeasedPartitions(gomock.Any(), gomock.Any()).Return([]string{}, nil).AnyTimes() + mockLeaseStore.EXPECT().GetLeasedPartitions(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]string{}, nil).AnyTimes() sub := setupSubscriberTest(t, mockMessageStore, mockOffsetStore, mockLeaseStore) ctx := context.Background() @@ -132,7 +134,7 @@ func TestSubscriber_Close(t *testing.T) { // Try to subscribe after close if needed if tt.subscribeAfter { - ch, err := sub.Subscribe(ctx, "test_topic") + ch, err := sub.Subscribe(ctx, "test_topic", testSubscriptionConfig()) if tt.expectSubError { require.Error(t, err) assert.Nil(t, ch) diff --git a/extension/queue/subscriber.go b/extension/queue/subscriber.go index 46d0ca79..5c294bdc 100644 --- a/extension/queue/subscriber.go +++ b/extension/queue/subscriber.go @@ -7,15 +7,18 @@ import ( // Subscriber consumes messages from topics. // Implementations must be thread-safe. type Subscriber interface { - // Subscribe starts consuming messages from the specified topic. + // Subscribe starts consuming messages from the specified topic with the given config. // Returns a channel of Delivery instances and an error if subscription fails. // + // Each subscription can have its own configuration for polling, batching, + // retries, and dead letter queue behavior. + // // The channel is closed when the subscriber is closed or context is cancelled. // Implementations should handle infrastructure errors internally (e.g., reconnect). // // Each Delivery provides the message and methods to acknowledge or reject it. // Consumers should call delivery.Ack() or delivery.Nack() for each delivery. - Subscribe(ctx context.Context, topic string) (<-chan Delivery, error) + Subscribe(ctx context.Context, topic string, config SubscriptionConfig) (<-chan Delivery, error) // Close gracefully shuts down the subscriber. // All delivery channels will be closed. diff --git a/extension/queue/subscription_config.go b/extension/queue/subscription_config.go new file mode 100644 index 00000000..89651e1e --- /dev/null +++ b/extension/queue/subscription_config.go @@ -0,0 +1,88 @@ +package queue + +// SubscriptionConfig holds per-subscription configuration. +// Each subscription (topic) can have its own settings for polling, +// batching, retries, and dead letter queue behavior. +type SubscriptionConfig struct { + // SubscriberName uniquely identifies this subscriber instance for partition leases. + // Different workers should use different names (e.g., hostname, pod name, UUID). + // Combined with ConsumerGroup, this determines which worker owns a partition lease. + SubscriberName string + + // ConsumerGroup identifies this consumer for offset tracking. + // Different consumer groups maintain independent offsets. + ConsumerGroup string + + // PollIntervalMs is how often to poll for new messages (in milliseconds). + PollIntervalMs int64 + + // BatchSize is the maximum number of messages to fetch per poll. + BatchSize int + + // VisibilityTimeoutMs is how long a message is invisible after being fetched (in milliseconds). + // If the worker crashes or doesn't ack/nack in time, the message becomes + // visible again after this duration. + VisibilityTimeoutMs int64 + + // LeaseRenewalIntervalMs is how often to renew partition leases (in milliseconds). + LeaseRenewalIntervalMs int64 + + // LeaseDurationMs is how long a lease is valid without renewal (in milliseconds). + // Stale leases (not renewed within this duration) can be stolen by other workers. + LeaseDurationMs int64 + + // Retry configures message retry behavior. + Retry RetryConfig + + // DLQ configures dead letter queue behavior. + DLQ DLQConfig +} + +// RetryConfig configures message retry behavior. +type RetryConfig struct { + // MaxAttempts is the maximum number of processing attempts. + // After this many attempts, the message is moved to DLQ (if enabled). + MaxAttempts int + + // InitialBackoffMs is the initial backoff duration for retries (in milliseconds). + InitialBackoffMs int64 + + // MaxBackoffMs is the maximum backoff duration (in milliseconds). + MaxBackoffMs int64 + + // BackoffMultiplier is the multiplier for exponential backoff. + BackoffMultiplier float64 +} + +// DLQConfig configures dead letter queue behavior. +type DLQConfig struct { + // Enabled enables dead letter queue. + Enabled bool + + // TopicSuffix is appended to the original topic name to create the DLQ topic. + // For example, if original topic is "orders" and suffix is "_dlq", DLQ topic will be "orders_dlq". + TopicSuffix string +} + +// DefaultSubscriptionConfig returns a SubscriptionConfig with sensible defaults. +func DefaultSubscriptionConfig(subscriberName, consumerGroup string) SubscriptionConfig { + return SubscriptionConfig{ + SubscriberName: subscriberName, + ConsumerGroup: consumerGroup, + PollIntervalMs: 100, // 100ms + BatchSize: 10, + VisibilityTimeoutMs: 60000, // 60s + LeaseRenewalIntervalMs: 10000, // 10s + LeaseDurationMs: 30000, // 30s + Retry: RetryConfig{ + MaxAttempts: 3, + InitialBackoffMs: 1000, // 1s + MaxBackoffMs: 30000, // 30s + BackoffMultiplier: 2.0, + }, + DLQ: DLQConfig{ + Enabled: true, + TopicSuffix: "_dlq", + }, + } +} diff --git a/extension/queue/subscription_config_test.go b/extension/queue/subscription_config_test.go new file mode 100644 index 00000000..59fe935e --- /dev/null +++ b/extension/queue/subscription_config_test.go @@ -0,0 +1,141 @@ +package queue + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDefaultSubscriptionConfig(t *testing.T) { + subscriberName := "test-worker" + consumerGroup := "test-consumer" + + config := DefaultSubscriptionConfig(subscriberName, consumerGroup) + + // Verify required fields are set + assert.Equal(t, subscriberName, config.SubscriberName) + assert.Equal(t, consumerGroup, config.ConsumerGroup) + + // Verify default timing values (in milliseconds) + assert.Equal(t, int64(100), config.PollIntervalMs) + assert.Equal(t, 10, config.BatchSize) + assert.Equal(t, int64(60000), config.VisibilityTimeoutMs) + assert.Equal(t, int64(10000), config.LeaseRenewalIntervalMs) + assert.Equal(t, int64(30000), config.LeaseDurationMs) + + // Verify retry config defaults + assert.Equal(t, 3, config.Retry.MaxAttempts) + assert.Equal(t, int64(1000), config.Retry.InitialBackoffMs) + assert.Equal(t, int64(30000), config.Retry.MaxBackoffMs) + assert.Equal(t, 2.0, config.Retry.BackoffMultiplier) + + // Verify DLQ config defaults + assert.True(t, config.DLQ.Enabled) + assert.Equal(t, "_dlq", config.DLQ.TopicSuffix) +} + +func TestSubscriptionConfig_FieldsAreIndependent(t *testing.T) { + // Create two configs and modify one to ensure they're independent + config1 := DefaultSubscriptionConfig("worker-1", "consumer-1") + config2 := DefaultSubscriptionConfig("worker-2", "consumer-2") + + // Modify config1 + config1.PollIntervalMs = 500 + config1.BatchSize = 100 + config1.Retry.MaxAttempts = 5 + config1.DLQ.TopicSuffix = "_failed" + + // Verify config2 is unaffected + assert.Equal(t, "worker-2", config2.SubscriberName) + assert.Equal(t, "consumer-2", config2.ConsumerGroup) + assert.Equal(t, int64(100), config2.PollIntervalMs) + assert.Equal(t, 10, config2.BatchSize) + assert.Equal(t, 3, config2.Retry.MaxAttempts) + assert.Equal(t, "_dlq", config2.DLQ.TopicSuffix) +} + +func TestSubscriptionConfig_CustomValues(t *testing.T) { + config := DefaultSubscriptionConfig("my-worker", "my-consumer") + + // Override with custom values (in milliseconds) + config.PollIntervalMs = 200 + config.BatchSize = 50 + config.VisibilityTimeoutMs = 120000 + config.LeaseRenewalIntervalMs = 20000 + config.LeaseDurationMs = 60000 + config.Retry.MaxAttempts = 5 + config.Retry.InitialBackoffMs = 2000 + config.Retry.MaxBackoffMs = 60000 + config.Retry.BackoffMultiplier = 3.0 + config.DLQ.Enabled = false + config.DLQ.TopicSuffix = "_dead" + + // Verify all custom values + assert.Equal(t, "my-worker", config.SubscriberName) + assert.Equal(t, "my-consumer", config.ConsumerGroup) + assert.Equal(t, int64(200), config.PollIntervalMs) + assert.Equal(t, 50, config.BatchSize) + assert.Equal(t, int64(120000), config.VisibilityTimeoutMs) + assert.Equal(t, int64(20000), config.LeaseRenewalIntervalMs) + assert.Equal(t, int64(60000), config.LeaseDurationMs) + assert.Equal(t, 5, config.Retry.MaxAttempts) + assert.Equal(t, int64(2000), config.Retry.InitialBackoffMs) + assert.Equal(t, int64(60000), config.Retry.MaxBackoffMs) + assert.Equal(t, 3.0, config.Retry.BackoffMultiplier) + assert.False(t, config.DLQ.Enabled) + assert.Equal(t, "_dead", config.DLQ.TopicSuffix) +} + +func TestRetryConfig_ZeroValues(t *testing.T) { + // Test that zero-value RetryConfig can be created + var config RetryConfig + + assert.Equal(t, 0, config.MaxAttempts) + assert.Equal(t, int64(0), config.InitialBackoffMs) + assert.Equal(t, int64(0), config.MaxBackoffMs) + assert.Equal(t, 0.0, config.BackoffMultiplier) +} + +func TestDLQConfig_ZeroValues(t *testing.T) { + // Test that zero-value DLQConfig can be created + var config DLQConfig + + assert.False(t, config.Enabled) + assert.Equal(t, "", config.TopicSuffix) +} + +func TestSubscriptionConfig_ZeroValues(t *testing.T) { + // Test that zero-value SubscriptionConfig can be created + var config SubscriptionConfig + + assert.Equal(t, "", config.SubscriberName) + assert.Equal(t, "", config.ConsumerGroup) + assert.Equal(t, int64(0), config.PollIntervalMs) + assert.Equal(t, 0, config.BatchSize) + assert.Equal(t, int64(0), config.VisibilityTimeoutMs) + assert.Equal(t, int64(0), config.LeaseRenewalIntervalMs) + assert.Equal(t, int64(0), config.LeaseDurationMs) + assert.Equal(t, 0, config.Retry.MaxAttempts) + assert.False(t, config.DLQ.Enabled) +} + +func TestSubscriptionConfig_DifferentConsumerGroups(t *testing.T) { + // Test that different consumer groups get independent configs + tests := []struct { + subscriberName string + consumerGroup string + }{ + {"worker-1", "group-A"}, + {"worker-1", "group-B"}, + {"worker-2", "group-A"}, + } + + for _, tt := range tests { + t.Run(tt.subscriberName+"_"+tt.consumerGroup, func(t *testing.T) { + config := DefaultSubscriptionConfig(tt.subscriberName, tt.consumerGroup) + require.Equal(t, tt.subscriberName, config.SubscriberName) + require.Equal(t, tt.consumerGroup, config.ConsumerGroup) + }) + } +} diff --git a/extension/storage/mysql/BUILD.bazel b/extension/storage/mysql/BUILD.bazel index d17f705f..b3efb345 100644 --- a/extension/storage/mysql/BUILD.bazel +++ b/extension/storage/mysql/BUILD.bazel @@ -3,8 +3,8 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "mysql", srcs = [ - "storage.go", "request_store.go", + "storage.go", ], importpath = "github.com/uber/submitqueue/extension/storage/mysql", visibility = ["//visibility:public"],