-
Notifications
You must be signed in to change notification settings - Fork 0
feat(queue/sql): add MySQL schema and configuration #20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| load("@rules_go//go:def.bzl", "go_library", "go_test") | ||
|
|
||
| go_library( | ||
| name = "sql", | ||
| srcs = ["config.go"], | ||
| importpath = "github.com/uber/submitqueue/extensions/queue/sql", | ||
| visibility = ["//visibility:public"], | ||
| ) | ||
|
|
||
| go_test( | ||
| name = "sql_test", | ||
| srcs = ["config_test.go"], | ||
| embed = [":sql"], | ||
| deps = [ | ||
| "@com_github_stretchr_testify//assert", | ||
| "@com_github_stretchr_testify//require", | ||
| ], | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,134 @@ | ||
| package sql | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "time" | ||
| ) | ||
|
|
||
| const ( | ||
| // Fixed table names for single-table design | ||
| MessagesTableName = "queue_messages" | ||
| PartitionLeasesTableName = "queue_partition_leases" | ||
| OffsetsTableName = "queue_offsets" | ||
| DLQTableName = "queue_dlq" | ||
| ) | ||
|
|
||
| // Config holds configuration for SQL-based queue. | ||
| // DB connection, logger, and metrics are passed separately to NewFactory. | ||
| type Config struct { | ||
| // ConsumerGroup identifies this consumer for offset tracking (required) | ||
| ConsumerGroup string | ||
|
|
||
| // WorkerID uniquely identifies this worker instance (required for partition leases) | ||
| // Example: hostname, pod name, UUID, etc. | ||
| WorkerID string | ||
|
|
||
| // PollInterval is how often to poll for new messages | ||
| PollInterval time.Duration | ||
|
|
||
| // BatchSize is the number of messages to fetch per poll | ||
| BatchSize int | ||
|
|
||
| // VisibilityTimeout is how long a message is invisible after being fetched | ||
| // If worker crashes or gets stuck, message becomes visible again after this duration | ||
| VisibilityTimeout time.Duration | ||
|
|
||
| // LeaseRenewalInterval is how often to renew partition leases | ||
| LeaseRenewalInterval time.Duration | ||
|
|
||
| // LeaseDuration is how long a lease is valid without renewal | ||
| // Stale leases (not renewed within this duration) can be stolen by other workers | ||
| LeaseDuration time.Duration | ||
|
|
||
| // Retry configuration for message retry | ||
| Retry RetryConfig | ||
|
|
||
| // DLQ configuration | ||
| DLQ DLQConfig | ||
| } | ||
|
|
||
| // RetryConfig configures message retry behavior | ||
| type RetryConfig struct { | ||
| // MaxAttempts is the maximum number of processing attempts | ||
| // After this many retries, message is moved to DLQ (if enabled) | ||
| // This includes both visibility timeout retries and explicit Nack retries | ||
| MaxAttempts int | ||
|
|
||
| // InitialBackoff is the initial backoff duration for explicit Nack retries | ||
| InitialBackoff time.Duration | ||
|
|
||
| // MaxBackoff is the maximum backoff duration | ||
| MaxBackoff time.Duration | ||
|
|
||
| // BackoffMultiplier is the multiplier for exponential backoff | ||
| BackoffMultiplier float64 | ||
| } | ||
|
|
||
| // DLQConfig configures dead letter queue | ||
| type DLQConfig struct { | ||
| // Enabled enables dead letter queue | ||
| Enabled bool | ||
| } | ||
|
|
||
| // DefaultConfig returns a Config with sensible defaults | ||
| func DefaultConfig(consumerGroup, workerID string) Config { | ||
| return Config{ | ||
| ConsumerGroup: consumerGroup, | ||
| WorkerID: workerID, | ||
| PollInterval: 100 * time.Millisecond, | ||
| BatchSize: 10, | ||
| VisibilityTimeout: 60 * time.Second, | ||
| LeaseRenewalInterval: 10 * time.Second, | ||
| LeaseDuration: 30 * time.Second, | ||
| Retry: RetryConfig{ | ||
| MaxAttempts: 3, | ||
| InitialBackoff: 1 * time.Second, | ||
| MaxBackoff: 30 * time.Second, | ||
| BackoffMultiplier: 2.0, | ||
| }, | ||
| DLQ: DLQConfig{ | ||
| Enabled: true, | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| // Validate checks if the configuration is valid | ||
| func (c *Config) Validate() error { | ||
| if c.ConsumerGroup == "" { | ||
| return fmt.Errorf("ConsumerGroup is required") | ||
| } | ||
| if c.WorkerID == "" { | ||
| return fmt.Errorf("WorkerID is required") | ||
| } | ||
| if c.PollInterval <= 0 { | ||
| return fmt.Errorf("PollInterval must be positive") | ||
| } | ||
| if c.BatchSize <= 0 { | ||
| return fmt.Errorf("BatchSize must be positive") | ||
| } | ||
| if c.VisibilityTimeout <= 0 { | ||
| return fmt.Errorf("VisibilityTimeout must be positive") | ||
| } | ||
| if c.LeaseRenewalInterval <= 0 { | ||
| return fmt.Errorf("LeaseRenewalInterval must be positive") | ||
| } | ||
| if c.LeaseDuration <= 0 { | ||
| return fmt.Errorf("LeaseDuration must be positive") | ||
| } | ||
| if c.LeaseRenewalInterval >= c.LeaseDuration { | ||
| return fmt.Errorf("LeaseRenewalInterval must be less than LeaseDuration") | ||
| } | ||
| if c.Retry.MaxAttempts < 1 { | ||
| return fmt.Errorf("Retry.MaxAttempts must be at least 1") | ||
| } | ||
| if c.Retry.InitialBackoff <= 0 { | ||
| return fmt.Errorf("Retry.InitialBackoff must be positive") | ||
| } | ||
| if c.Retry.MaxBackoff <= 0 { | ||
| return fmt.Errorf("Retry.MaxBackoff must be positive") | ||
| } | ||
| if c.Retry.BackoffMultiplier < 1.0 { | ||
| return fmt.Errorf("Retry.BackoffMultiplier must be >= 1.0") | ||
| } | ||
| return nil | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| package sql | ||
|
|
||
| import ( | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestDefaultConfig(t *testing.T) { | ||
| cfg := DefaultConfig("test-consumer", "test-worker") | ||
|
|
||
| assert.Equal(t, "test-consumer", cfg.ConsumerGroup) | ||
| assert.Equal(t, "test-worker", cfg.WorkerID) | ||
| assert.Equal(t, 100*time.Millisecond, cfg.PollInterval) | ||
| assert.Equal(t, 10, cfg.BatchSize) | ||
| assert.Equal(t, 60*time.Second, cfg.VisibilityTimeout) | ||
| assert.Equal(t, 10*time.Second, cfg.LeaseRenewalInterval) | ||
| assert.Equal(t, 30*time.Second, cfg.LeaseDuration) | ||
| assert.True(t, cfg.DLQ.Enabled) | ||
| assert.Equal(t, 3, cfg.Retry.MaxAttempts) | ||
| assert.Equal(t, 1*time.Second, cfg.Retry.InitialBackoff) | ||
| assert.Equal(t, 30*time.Second, cfg.Retry.MaxBackoff) | ||
| assert.Equal(t, 2.0, cfg.Retry.BackoffMultiplier) | ||
| } | ||
|
|
||
| func TestConfigValidation(t *testing.T) { | ||
| tests := []struct { | ||
| name string | ||
| config Config | ||
| expectError bool | ||
| errorMsg string | ||
| }{ | ||
| { | ||
| name: "valid config", | ||
| config: DefaultConfig("test-consumer", "test-worker"), | ||
| expectError: false, | ||
| }, | ||
| { | ||
| name: "empty consumer group", | ||
| config: Config{ | ||
| ConsumerGroup: "", | ||
| WorkerID: "test-worker", | ||
| PollInterval: 100 * time.Millisecond, | ||
| BatchSize: 10, | ||
| VisibilityTimeout: 60 * time.Second, | ||
| LeaseRenewalInterval: 10 * time.Second, | ||
| LeaseDuration: 30 * time.Second, | ||
| Retry: DefaultConfig("dummy", "dummy").Retry, | ||
| }, | ||
| expectError: true, | ||
| errorMsg: "ConsumerGroup is required", | ||
| }, | ||
| { | ||
| name: "empty worker ID", | ||
| config: Config{ | ||
| ConsumerGroup: "test", | ||
| WorkerID: "", | ||
| PollInterval: 100 * time.Millisecond, | ||
| BatchSize: 10, | ||
| VisibilityTimeout: 60 * time.Second, | ||
| LeaseRenewalInterval: 10 * time.Second, | ||
| LeaseDuration: 30 * time.Second, | ||
| Retry: DefaultConfig("dummy", "dummy").Retry, | ||
| }, | ||
| expectError: true, | ||
| errorMsg: "WorkerID is required", | ||
| }, | ||
| { | ||
| name: "invalid poll interval", | ||
| config: Config{ | ||
| ConsumerGroup: "test", | ||
| WorkerID: "test-worker", | ||
| PollInterval: 0, | ||
| BatchSize: 10, | ||
| VisibilityTimeout: 60 * time.Second, | ||
| LeaseRenewalInterval: 10 * time.Second, | ||
| LeaseDuration: 30 * time.Second, | ||
| Retry: DefaultConfig("dummy", "dummy").Retry, | ||
| }, | ||
| expectError: true, | ||
| errorMsg: "PollInterval must be positive", | ||
| }, | ||
| { | ||
| name: "invalid batch size", | ||
| config: Config{ | ||
| ConsumerGroup: "test", | ||
| WorkerID: "test-worker", | ||
| PollInterval: 100 * time.Millisecond, | ||
| BatchSize: 0, | ||
| VisibilityTimeout: 60 * time.Second, | ||
| LeaseRenewalInterval: 10 * time.Second, | ||
| LeaseDuration: 30 * time.Second, | ||
| Retry: DefaultConfig("dummy", "dummy").Retry, | ||
| }, | ||
| expectError: true, | ||
| errorMsg: "BatchSize must be positive", | ||
| }, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| err := tt.config.Validate() | ||
| if tt.expectError { | ||
| require.Error(t, err) | ||
| assert.Contains(t, err.Error(), tt.errorMsg) | ||
| } else { | ||
| require.NoError(t, err) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| filegroup( | ||
| name = "schema", | ||
| srcs = [ | ||
| "queue_dlq.sql", | ||
| "queue_messages.sql", | ||
| "queue_offsets.sql", | ||
| "queue_partition_leases.sql", | ||
| ], | ||
| visibility = ["//visibility:public"], | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| -- DEAD LETTER QUEUE TABLE | ||
| -- Failed messages that exhausted retry attempts. | ||
|
|
||
| CREATE TABLE IF NOT EXISTS queue_dlq ( | ||
| -- Auto-incrementing global offset for ordering/acking in DLQ | ||
| offset BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, | ||
|
|
||
| -- Original topic and partition | ||
| topic VARCHAR(255) NOT NULL, | ||
| partition_key VARCHAR(255) NOT NULL, | ||
|
|
||
| -- Message identification (for deduplication) | ||
| id VARCHAR(255) NOT NULL, | ||
|
|
||
| -- Message data | ||
| payload BLOB NOT NULL, | ||
| metadata JSON, | ||
|
|
||
| -- Original timestamps (epoch milliseconds) | ||
| created_at BIGINT UNSIGNED NOT NULL, | ||
| published_at BIGINT UNSIGNED NOT NULL, | ||
|
|
||
| -- DLQ-specific fields | ||
| failed_at BIGINT UNSIGNED NOT NULL, | ||
| failure_count INT UNSIGNED NOT NULL, | ||
| last_error TEXT, | ||
|
|
||
| -- Supports: SELECT ... WHERE topic=? AND partition_key=? AND failed_at>=? ORDER BY failed_at | ||
| -- Used for fetching recently failed messages for a specific topic/partition, e.g., for retrying or monitoring | ||
| INDEX idx_topic_partition_failed (topic, partition_key, failed_at), | ||
|
|
||
| -- Supports: SELECT ... WHERE topic=? AND failed_at>=? ORDER BY failed_at | ||
| -- Used for fetching recently failed messages across all partitions of a topic | ||
| INDEX idx_failed_at (topic, failed_at), | ||
|
|
||
| -- Unique constraint to prevent duplicate entries for the same message in the DLQ | ||
| -- Supports: INSERT ... ON DUPLICATE KEY to enforce idempotent DLQ operations | ||
| -- Also enables efficient lookups for retrying or inspecting specific failed messages | ||
| UNIQUE KEY idx_topic_partition_id (topic, partition_key, id) | ||
| ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| -- MESSAGES TABLE | ||
| -- Single table for all topics. Partition key determines distribution across workers. | ||
| -- Example: topic="merge_queue", partition_key="uber/cadence" | ||
|
|
||
| CREATE TABLE IF NOT EXISTS queue_messages ( | ||
| -- Auto-incrementing global offset for ordering | ||
| offset BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, | ||
|
|
||
| -- Topic identifies the queue type | ||
| topic VARCHAR(255) NOT NULL, | ||
|
|
||
| -- Partition key for distributing work across workers | ||
| -- Example: repo ID, user ID, tenant ID | ||
| partition_key VARCHAR(255) NOT NULL, | ||
|
|
||
| -- Message identification | ||
| id VARCHAR(255) NOT NULL, | ||
|
|
||
| -- Message data | ||
| payload BLOB NOT NULL, | ||
| metadata JSON, | ||
|
|
||
| -- Retry tracking (persistent across workers) | ||
| retry_count INT UNSIGNED NOT NULL, | ||
|
|
||
| -- Visibility timeout (epoch milliseconds) | ||
| -- Messages invisible until this timestamp expires | ||
| invisible_until BIGINT UNSIGNED NOT NULL, | ||
|
|
||
| -- Timestamps (epoch milliseconds) | ||
| created_at BIGINT UNSIGNED NOT NULL, | ||
| published_at BIGINT UNSIGNED NOT NULL, | ||
|
|
||
| -- Supports: SELECT ... WHERE topic=? AND partition_key=? AND invisible_until<=? ORDER BY offset | ||
| -- Used by subscribers to poll for ready-to-process messages within their assigned partition | ||
| INDEX idx_topic_partition_visible_offset (topic, partition_key, invisible_until, offset), | ||
|
behinddwalls marked this conversation as resolved.
|
||
|
|
||
| -- Supports: INSERT ... ON DUPLICATE KEY to enforce idempotent publishes | ||
| -- Also enables efficient lookups for message updates/deletes by ID | ||
| UNIQUE KEY idx_topic_partition_id (topic, partition_key, id) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be primary key instead?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. offset is the primary which is easier to reason it seems as per AI, primarily how it auto-increments and avoids page fragmentation, so having this secondary index is not too bad for this table. |
||
| ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| -- CONSUMER OFFSETS TABLE | ||
| -- Tracks consumption progress per consumer group + topic + partition. | ||
| -- Each partition has independent offset tracking for crash recovery. | ||
|
|
||
| CREATE TABLE IF NOT EXISTS queue_offsets ( | ||
| -- Consumer group consuming the topic | ||
| consumer_group VARCHAR(255) NOT NULL, | ||
|
|
||
| -- Topic being consumed | ||
| topic VARCHAR(255) NOT NULL, | ||
|
|
||
| -- Partition being consumed | ||
| partition_key VARCHAR(255) NOT NULL, | ||
|
|
||
| -- Last offset that was successfully acked for this partition | ||
| offset_acked BIGINT UNSIGNED NOT NULL, | ||
|
|
||
| -- Last update timestamp (epoch milliseconds) | ||
| updated_at BIGINT UNSIGNED NOT NULL, | ||
|
|
||
| -- Primary key ensures each consumer group has one offset per topic/partition | ||
| -- Supports: INSERT ... ON DUPLICATE KEY UPDATE for idempotent offset updates | ||
| -- Also enables efficient lookups: SELECT ... WHERE consumer_group=? AND topic=? AND partition_key=? | ||
| PRIMARY KEY (consumer_group, topic, partition_key), | ||
|
|
||
| -- Supports: SELECT ... WHERE consumer_group=? | ||
| -- Used for querying all offsets for a specific consumer group (e.g., for monitoring or rebalancing) | ||
| INDEX idx_consumer_group (consumer_group), | ||
|
|
||
| -- Supports: SELECT ... WHERE topic=? | ||
| -- Used for querying all consumer groups consuming a specific topic | ||
| INDEX idx_topic (topic) | ||
| ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AI likes to generate string comparisons for error instead of error types (because typically there are no error types); I think this is a bad practice that makes tests fragile.
Just asserting the error is usually ok; when required by code flow, can dedicate an error type and use Is/As semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it sort of validate which error is being returned and not short-circuiting due to any other errors in the call chain
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If error message is not part of the protocol, this makes test fragile to changes for not reason dictated by a change.
If it is part of the protocol, the general convention is for it to be anywhere in the chain and discoverable with errors.Is/errors.As, and I believe the test should follow the convention.