From fc19779cb6abdc22d6219db2f853ae77e494f0d7 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Mon, 16 Feb 2026 16:15:26 -0800 Subject: [PATCH] feat(queue/sql): add MySQL schema and configuration ## Why? Need database schema and configuration for SQL-based queue implementation to support distributed message processing. ## What? - MySQL schema with 4 tables: messages, offsets, partition leases, and DLQ - Configuration struct with validation for consumer groups, timeouts, and retry policies - Bazel build integration ## Test Plan - Config validation tests pass - Default config values are correct - Invalid configs are rejected appropriately --- extensions/queue/sql/BUILD.bazel | 18 +++ extensions/queue/sql/config.go | 134 ++++++++++++++++++ extensions/queue/sql/config_test.go | 113 +++++++++++++++ extensions/queue/sql/schema/BUILD.bazel | 10 ++ extensions/queue/sql/schema/queue_dlq.sql | 40 ++++++ .../queue/sql/schema/queue_messages.sql | 41 ++++++ extensions/queue/sql/schema/queue_offsets.sql | 33 +++++ .../sql/schema/queue_partition_leases.sql | 37 +++++ 8 files changed, 426 insertions(+) create mode 100644 extensions/queue/sql/BUILD.bazel create mode 100644 extensions/queue/sql/config.go create mode 100644 extensions/queue/sql/config_test.go create mode 100644 extensions/queue/sql/schema/BUILD.bazel create mode 100644 extensions/queue/sql/schema/queue_dlq.sql create mode 100644 extensions/queue/sql/schema/queue_messages.sql create mode 100644 extensions/queue/sql/schema/queue_offsets.sql create mode 100644 extensions/queue/sql/schema/queue_partition_leases.sql diff --git a/extensions/queue/sql/BUILD.bazel b/extensions/queue/sql/BUILD.bazel new file mode 100644 index 00000000..c2dd000c --- /dev/null +++ b/extensions/queue/sql/BUILD.bazel @@ -0,0 +1,18 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "sql", + srcs = ["config.go"], + importpath = "github.com/uber/submitqueue/extensions/queue/sql", + visibility = ["//visibility:public"], +) + +go_test( + name = "sql_test", + srcs = ["config_test.go"], + embed = [":sql"], + deps = [ + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/extensions/queue/sql/config.go b/extensions/queue/sql/config.go new file mode 100644 index 00000000..4a9073ca --- /dev/null +++ b/extensions/queue/sql/config.go @@ -0,0 +1,134 @@ +package sql + +import ( + "fmt" + "time" +) + +const ( + // Fixed table names for single-table design + MessagesTableName = "queue_messages" + PartitionLeasesTableName = "queue_partition_leases" + OffsetsTableName = "queue_offsets" + DLQTableName = "queue_dlq" +) + +// Config holds configuration for SQL-based queue. +// DB connection, logger, and metrics are passed separately to NewFactory. +type Config struct { + // ConsumerGroup identifies this consumer for offset tracking (required) + ConsumerGroup string + + // WorkerID uniquely identifies this worker instance (required for partition leases) + // Example: hostname, pod name, UUID, etc. + WorkerID string + + // PollInterval is how often to poll for new messages + PollInterval time.Duration + + // BatchSize is the number of messages to fetch per poll + BatchSize int + + // VisibilityTimeout is how long a message is invisible after being fetched + // If worker crashes or gets stuck, message becomes visible again after this duration + VisibilityTimeout time.Duration + + // LeaseRenewalInterval is how often to renew partition leases + LeaseRenewalInterval time.Duration + + // LeaseDuration is how long a lease is valid without renewal + // Stale leases (not renewed within this duration) can be stolen by other workers + LeaseDuration time.Duration + + // Retry configuration for message retry + Retry RetryConfig + + // DLQ configuration + DLQ DLQConfig +} + +// RetryConfig configures message retry behavior +type RetryConfig struct { + // MaxAttempts is the maximum number of processing attempts + // After this many retries, message is moved to DLQ (if enabled) + // This includes both visibility timeout retries and explicit Nack retries + MaxAttempts int + + // InitialBackoff is the initial backoff duration for explicit Nack retries + InitialBackoff time.Duration + + // MaxBackoff is the maximum backoff duration + MaxBackoff time.Duration + + // BackoffMultiplier is the multiplier for exponential backoff + BackoffMultiplier float64 +} + +// DLQConfig configures dead letter queue +type DLQConfig struct { + // Enabled enables dead letter queue + Enabled bool +} + +// DefaultConfig returns a Config with sensible defaults +func DefaultConfig(consumerGroup, workerID string) Config { + return Config{ + ConsumerGroup: consumerGroup, + WorkerID: workerID, + PollInterval: 100 * time.Millisecond, + BatchSize: 10, + VisibilityTimeout: 60 * time.Second, + LeaseRenewalInterval: 10 * time.Second, + LeaseDuration: 30 * time.Second, + Retry: RetryConfig{ + MaxAttempts: 3, + InitialBackoff: 1 * time.Second, + MaxBackoff: 30 * time.Second, + BackoffMultiplier: 2.0, + }, + DLQ: DLQConfig{ + Enabled: true, + }, + } +} + +// Validate checks if the configuration is valid +func (c *Config) Validate() error { + if c.ConsumerGroup == "" { + return fmt.Errorf("ConsumerGroup is required") + } + if c.WorkerID == "" { + return fmt.Errorf("WorkerID is required") + } + if c.PollInterval <= 0 { + return fmt.Errorf("PollInterval must be positive") + } + if c.BatchSize <= 0 { + return fmt.Errorf("BatchSize must be positive") + } + if c.VisibilityTimeout <= 0 { + return fmt.Errorf("VisibilityTimeout must be positive") + } + if c.LeaseRenewalInterval <= 0 { + return fmt.Errorf("LeaseRenewalInterval must be positive") + } + if c.LeaseDuration <= 0 { + return fmt.Errorf("LeaseDuration must be positive") + } + if c.LeaseRenewalInterval >= c.LeaseDuration { + return fmt.Errorf("LeaseRenewalInterval must be less than LeaseDuration") + } + if c.Retry.MaxAttempts < 1 { + return fmt.Errorf("Retry.MaxAttempts must be at least 1") + } + if c.Retry.InitialBackoff <= 0 { + return fmt.Errorf("Retry.InitialBackoff must be positive") + } + if c.Retry.MaxBackoff <= 0 { + return fmt.Errorf("Retry.MaxBackoff must be positive") + } + if c.Retry.BackoffMultiplier < 1.0 { + return fmt.Errorf("Retry.BackoffMultiplier must be >= 1.0") + } + return nil +} diff --git a/extensions/queue/sql/config_test.go b/extensions/queue/sql/config_test.go new file mode 100644 index 00000000..b7511fd0 --- /dev/null +++ b/extensions/queue/sql/config_test.go @@ -0,0 +1,113 @@ +package sql + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDefaultConfig(t *testing.T) { + cfg := DefaultConfig("test-consumer", "test-worker") + + assert.Equal(t, "test-consumer", cfg.ConsumerGroup) + assert.Equal(t, "test-worker", cfg.WorkerID) + assert.Equal(t, 100*time.Millisecond, cfg.PollInterval) + assert.Equal(t, 10, cfg.BatchSize) + assert.Equal(t, 60*time.Second, cfg.VisibilityTimeout) + assert.Equal(t, 10*time.Second, cfg.LeaseRenewalInterval) + assert.Equal(t, 30*time.Second, cfg.LeaseDuration) + assert.True(t, cfg.DLQ.Enabled) + assert.Equal(t, 3, cfg.Retry.MaxAttempts) + assert.Equal(t, 1*time.Second, cfg.Retry.InitialBackoff) + assert.Equal(t, 30*time.Second, cfg.Retry.MaxBackoff) + assert.Equal(t, 2.0, cfg.Retry.BackoffMultiplier) +} + +func TestConfigValidation(t *testing.T) { + tests := []struct { + name string + config Config + expectError bool + errorMsg string + }{ + { + name: "valid config", + config: DefaultConfig("test-consumer", "test-worker"), + expectError: false, + }, + { + name: "empty consumer group", + config: Config{ + ConsumerGroup: "", + WorkerID: "test-worker", + PollInterval: 100 * time.Millisecond, + BatchSize: 10, + VisibilityTimeout: 60 * time.Second, + LeaseRenewalInterval: 10 * time.Second, + LeaseDuration: 30 * time.Second, + Retry: DefaultConfig("dummy", "dummy").Retry, + }, + expectError: true, + errorMsg: "ConsumerGroup is required", + }, + { + name: "empty worker ID", + config: Config{ + ConsumerGroup: "test", + WorkerID: "", + PollInterval: 100 * time.Millisecond, + BatchSize: 10, + VisibilityTimeout: 60 * time.Second, + LeaseRenewalInterval: 10 * time.Second, + LeaseDuration: 30 * time.Second, + Retry: DefaultConfig("dummy", "dummy").Retry, + }, + expectError: true, + errorMsg: "WorkerID is required", + }, + { + name: "invalid poll interval", + config: Config{ + ConsumerGroup: "test", + WorkerID: "test-worker", + PollInterval: 0, + BatchSize: 10, + VisibilityTimeout: 60 * time.Second, + LeaseRenewalInterval: 10 * time.Second, + LeaseDuration: 30 * time.Second, + Retry: DefaultConfig("dummy", "dummy").Retry, + }, + expectError: true, + errorMsg: "PollInterval must be positive", + }, + { + name: "invalid batch size", + config: Config{ + ConsumerGroup: "test", + WorkerID: "test-worker", + PollInterval: 100 * time.Millisecond, + BatchSize: 0, + VisibilityTimeout: 60 * time.Second, + LeaseRenewalInterval: 10 * time.Second, + LeaseDuration: 30 * time.Second, + Retry: DefaultConfig("dummy", "dummy").Retry, + }, + expectError: true, + errorMsg: "BatchSize must be positive", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.config.Validate() + if tt.expectError { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errorMsg) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/extensions/queue/sql/schema/BUILD.bazel b/extensions/queue/sql/schema/BUILD.bazel new file mode 100644 index 00000000..df9efc2e --- /dev/null +++ b/extensions/queue/sql/schema/BUILD.bazel @@ -0,0 +1,10 @@ +filegroup( + name = "schema", + srcs = [ + "queue_dlq.sql", + "queue_messages.sql", + "queue_offsets.sql", + "queue_partition_leases.sql", + ], + visibility = ["//visibility:public"], +) diff --git a/extensions/queue/sql/schema/queue_dlq.sql b/extensions/queue/sql/schema/queue_dlq.sql new file mode 100644 index 00000000..7a939264 --- /dev/null +++ b/extensions/queue/sql/schema/queue_dlq.sql @@ -0,0 +1,40 @@ +-- DEAD LETTER QUEUE TABLE +-- Failed messages that exhausted retry attempts. + +CREATE TABLE IF NOT EXISTS queue_dlq ( + -- Auto-incrementing global offset for ordering/acking in DLQ + offset BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + + -- Original topic and partition + topic VARCHAR(255) NOT NULL, + partition_key VARCHAR(255) NOT NULL, + + -- Message identification (for deduplication) + id VARCHAR(255) NOT NULL, + + -- Message data + payload BLOB NOT NULL, + metadata JSON, + + -- Original timestamps (epoch milliseconds) + created_at BIGINT UNSIGNED NOT NULL, + published_at BIGINT UNSIGNED NOT NULL, + + -- DLQ-specific fields + failed_at BIGINT UNSIGNED NOT NULL, + failure_count INT UNSIGNED NOT NULL, + last_error TEXT, + + -- Supports: SELECT ... WHERE topic=? AND partition_key=? AND failed_at>=? ORDER BY failed_at + -- Used for fetching recently failed messages for a specific topic/partition, e.g., for retrying or monitoring + INDEX idx_topic_partition_failed (topic, partition_key, failed_at), + + -- Supports: SELECT ... WHERE topic=? AND failed_at>=? ORDER BY failed_at + -- Used for fetching recently failed messages across all partitions of a topic + INDEX idx_failed_at (topic, failed_at), + + -- Unique constraint to prevent duplicate entries for the same message in the DLQ + -- Supports: INSERT ... ON DUPLICATE KEY to enforce idempotent DLQ operations + -- Also enables efficient lookups for retrying or inspecting specific failed messages + UNIQUE KEY idx_topic_partition_id (topic, partition_key, id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; diff --git a/extensions/queue/sql/schema/queue_messages.sql b/extensions/queue/sql/schema/queue_messages.sql new file mode 100644 index 00000000..1904c90c --- /dev/null +++ b/extensions/queue/sql/schema/queue_messages.sql @@ -0,0 +1,41 @@ +-- MESSAGES TABLE +-- Single table for all topics. Partition key determines distribution across workers. +-- Example: topic="merge_queue", partition_key="uber/cadence" + +CREATE TABLE IF NOT EXISTS queue_messages ( + -- Auto-incrementing global offset for ordering + offset BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + + -- Topic identifies the queue type + topic VARCHAR(255) NOT NULL, + + -- Partition key for distributing work across workers + -- Example: repo ID, user ID, tenant ID + partition_key VARCHAR(255) NOT NULL, + + -- Message identification + id VARCHAR(255) NOT NULL, + + -- Message data + payload BLOB NOT NULL, + metadata JSON, + + -- Retry tracking (persistent across workers) + retry_count INT UNSIGNED NOT NULL, + + -- Visibility timeout (epoch milliseconds) + -- Messages invisible until this timestamp expires + invisible_until BIGINT UNSIGNED NOT NULL, + + -- Timestamps (epoch milliseconds) + created_at BIGINT UNSIGNED NOT NULL, + published_at BIGINT UNSIGNED NOT NULL, + + -- Supports: SELECT ... WHERE topic=? AND partition_key=? AND invisible_until<=? ORDER BY offset + -- Used by subscribers to poll for ready-to-process messages within their assigned partition + INDEX idx_topic_partition_visible_offset (topic, partition_key, invisible_until, offset), + + -- Supports: INSERT ... ON DUPLICATE KEY to enforce idempotent publishes + -- Also enables efficient lookups for message updates/deletes by ID + UNIQUE KEY idx_topic_partition_id (topic, partition_key, id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; diff --git a/extensions/queue/sql/schema/queue_offsets.sql b/extensions/queue/sql/schema/queue_offsets.sql new file mode 100644 index 00000000..888aa666 --- /dev/null +++ b/extensions/queue/sql/schema/queue_offsets.sql @@ -0,0 +1,33 @@ +-- CONSUMER OFFSETS TABLE +-- Tracks consumption progress per consumer group + topic + partition. +-- Each partition has independent offset tracking for crash recovery. + +CREATE TABLE IF NOT EXISTS queue_offsets ( + -- Consumer group consuming the topic + consumer_group VARCHAR(255) NOT NULL, + + -- Topic being consumed + topic VARCHAR(255) NOT NULL, + + -- Partition being consumed + partition_key VARCHAR(255) NOT NULL, + + -- Last offset that was successfully acked for this partition + offset_acked BIGINT UNSIGNED NOT NULL, + + -- Last update timestamp (epoch milliseconds) + updated_at BIGINT UNSIGNED NOT NULL, + + -- Primary key ensures each consumer group has one offset per topic/partition + -- Supports: INSERT ... ON DUPLICATE KEY UPDATE for idempotent offset updates + -- Also enables efficient lookups: SELECT ... WHERE consumer_group=? AND topic=? AND partition_key=? + PRIMARY KEY (consumer_group, topic, partition_key), + + -- Supports: SELECT ... WHERE consumer_group=? + -- Used for querying all offsets for a specific consumer group (e.g., for monitoring or rebalancing) + INDEX idx_consumer_group (consumer_group), + + -- Supports: SELECT ... WHERE topic=? + -- Used for querying all consumer groups consuming a specific topic + INDEX idx_topic (topic) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; diff --git a/extensions/queue/sql/schema/queue_partition_leases.sql b/extensions/queue/sql/schema/queue_partition_leases.sql new file mode 100644 index 00000000..715b6d03 --- /dev/null +++ b/extensions/queue/sql/schema/queue_partition_leases.sql @@ -0,0 +1,37 @@ +-- PARTITION LEASES TABLE +-- Tracks which worker has leased which partition for exclusive processing. +-- Workers must renew leases to maintain ownership; stale leases can be stolen. + +CREATE TABLE IF NOT EXISTS queue_partition_leases ( + -- Consumer group (e.g., "orchestrator") + consumer_group VARCHAR(255) NOT NULL, + + -- Topic being consumed + topic VARCHAR(255) NOT NULL, + + -- Partition that is leased + partition_key VARCHAR(255) NOT NULL, + + -- Worker that owns the lease (e.g., "worker-1") + leased_by VARCHAR(255) NOT NULL, + + -- When lease was acquired (epoch milliseconds) + leased_at BIGINT UNSIGNED NOT NULL, + + -- Last lease renewal timestamp (epoch milliseconds) + -- Used to detect stale leases + lease_renewed_at BIGINT UNSIGNED NOT NULL, + + -- Primary key ensures each partition can only be leased by one worker per consumer group + -- Supports: INSERT ... ON DUPLICATE KEY UPDATE for lease acquisition and renewal + -- Also enables efficient lookups: SELECT ... WHERE consumer_group=? AND topic=? AND partition_key=? + PRIMARY KEY (consumer_group, topic, partition_key), + + -- Supports: SELECT ... WHERE leased_by=? + -- Used for querying all partitions owned by a specific worker (e.g., for graceful shutdown or rebalancing) + INDEX idx_leased_by (leased_by), + + -- Supports: SELECT ... WHERE lease_renewed_at