Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions extensions/queue/sql/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "sql",
srcs = ["config.go"],
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
visibility = ["//visibility:public"],
)

go_test(
name = "sql_test",
srcs = ["config_test.go"],
embed = [":sql"],
deps = [
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
134 changes: 134 additions & 0 deletions extensions/queue/sql/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package sql

import (
"fmt"
"time"
)

const (
// Fixed table names for single-table design
MessagesTableName = "queue_messages"
PartitionLeasesTableName = "queue_partition_leases"
OffsetsTableName = "queue_offsets"
DLQTableName = "queue_dlq"
)

// Config holds configuration for SQL-based queue.
// DB connection, logger, and metrics are passed separately to NewFactory.
type Config struct {
// ConsumerGroup identifies this consumer for offset tracking (required)
ConsumerGroup string

// WorkerID uniquely identifies this worker instance (required for partition leases)
// Example: hostname, pod name, UUID, etc.
WorkerID string

// PollInterval is how often to poll for new messages
PollInterval time.Duration

// BatchSize is the number of messages to fetch per poll
BatchSize int

// VisibilityTimeout is how long a message is invisible after being fetched
// If worker crashes or gets stuck, message becomes visible again after this duration
VisibilityTimeout time.Duration

// LeaseRenewalInterval is how often to renew partition leases
LeaseRenewalInterval time.Duration

// LeaseDuration is how long a lease is valid without renewal
// Stale leases (not renewed within this duration) can be stolen by other workers
LeaseDuration time.Duration

// Retry configuration for message retry
Retry RetryConfig

// DLQ configuration
DLQ DLQConfig
}

// RetryConfig configures message retry behavior
type RetryConfig struct {
// MaxAttempts is the maximum number of processing attempts
// After this many retries, message is moved to DLQ (if enabled)
// This includes both visibility timeout retries and explicit Nack retries
MaxAttempts int

// InitialBackoff is the initial backoff duration for explicit Nack retries
InitialBackoff time.Duration

// MaxBackoff is the maximum backoff duration
MaxBackoff time.Duration

// BackoffMultiplier is the multiplier for exponential backoff
BackoffMultiplier float64
}

// DLQConfig configures dead letter queue
type DLQConfig struct {
// Enabled enables dead letter queue
Enabled bool
}

// DefaultConfig returns a Config with sensible defaults
func DefaultConfig(consumerGroup, workerID string) Config {
return Config{
ConsumerGroup: consumerGroup,
WorkerID: workerID,
PollInterval: 100 * time.Millisecond,
BatchSize: 10,
VisibilityTimeout: 60 * time.Second,
LeaseRenewalInterval: 10 * time.Second,
LeaseDuration: 30 * time.Second,
Retry: RetryConfig{
MaxAttempts: 3,
InitialBackoff: 1 * time.Second,
MaxBackoff: 30 * time.Second,
BackoffMultiplier: 2.0,
},
DLQ: DLQConfig{
Enabled: true,
},
}
}

// Validate checks if the configuration is valid
func (c *Config) Validate() error {
if c.ConsumerGroup == "" {
return fmt.Errorf("ConsumerGroup is required")
}
if c.WorkerID == "" {
return fmt.Errorf("WorkerID is required")
}
if c.PollInterval <= 0 {
return fmt.Errorf("PollInterval must be positive")
}
if c.BatchSize <= 0 {
return fmt.Errorf("BatchSize must be positive")
}
if c.VisibilityTimeout <= 0 {
return fmt.Errorf("VisibilityTimeout must be positive")
}
if c.LeaseRenewalInterval <= 0 {
return fmt.Errorf("LeaseRenewalInterval must be positive")
}
if c.LeaseDuration <= 0 {
return fmt.Errorf("LeaseDuration must be positive")
}
if c.LeaseRenewalInterval >= c.LeaseDuration {
return fmt.Errorf("LeaseRenewalInterval must be less than LeaseDuration")
}
if c.Retry.MaxAttempts < 1 {
return fmt.Errorf("Retry.MaxAttempts must be at least 1")
}
if c.Retry.InitialBackoff <= 0 {
return fmt.Errorf("Retry.InitialBackoff must be positive")
}
if c.Retry.MaxBackoff <= 0 {
return fmt.Errorf("Retry.MaxBackoff must be positive")
}
if c.Retry.BackoffMultiplier < 1.0 {
return fmt.Errorf("Retry.BackoffMultiplier must be >= 1.0")
}
return nil
}
113 changes: 113 additions & 0 deletions extensions/queue/sql/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package sql

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDefaultConfig(t *testing.T) {
cfg := DefaultConfig("test-consumer", "test-worker")

assert.Equal(t, "test-consumer", cfg.ConsumerGroup)
assert.Equal(t, "test-worker", cfg.WorkerID)
assert.Equal(t, 100*time.Millisecond, cfg.PollInterval)
assert.Equal(t, 10, cfg.BatchSize)
assert.Equal(t, 60*time.Second, cfg.VisibilityTimeout)
assert.Equal(t, 10*time.Second, cfg.LeaseRenewalInterval)
assert.Equal(t, 30*time.Second, cfg.LeaseDuration)
assert.True(t, cfg.DLQ.Enabled)
assert.Equal(t, 3, cfg.Retry.MaxAttempts)
assert.Equal(t, 1*time.Second, cfg.Retry.InitialBackoff)
assert.Equal(t, 30*time.Second, cfg.Retry.MaxBackoff)
assert.Equal(t, 2.0, cfg.Retry.BackoffMultiplier)
}

func TestConfigValidation(t *testing.T) {
tests := []struct {
name string
config Config
expectError bool
errorMsg string
}{
{
name: "valid config",
config: DefaultConfig("test-consumer", "test-worker"),
expectError: false,
},
{
name: "empty consumer group",
config: Config{
ConsumerGroup: "",
WorkerID: "test-worker",
PollInterval: 100 * time.Millisecond,
BatchSize: 10,
VisibilityTimeout: 60 * time.Second,
LeaseRenewalInterval: 10 * time.Second,
LeaseDuration: 30 * time.Second,
Retry: DefaultConfig("dummy", "dummy").Retry,
},
expectError: true,
errorMsg: "ConsumerGroup is required",
},
{
name: "empty worker ID",
config: Config{
ConsumerGroup: "test",
WorkerID: "",
PollInterval: 100 * time.Millisecond,
BatchSize: 10,
VisibilityTimeout: 60 * time.Second,
LeaseRenewalInterval: 10 * time.Second,
LeaseDuration: 30 * time.Second,
Retry: DefaultConfig("dummy", "dummy").Retry,
},
expectError: true,
errorMsg: "WorkerID is required",
},
{
name: "invalid poll interval",
config: Config{
ConsumerGroup: "test",
WorkerID: "test-worker",
PollInterval: 0,
BatchSize: 10,
VisibilityTimeout: 60 * time.Second,
LeaseRenewalInterval: 10 * time.Second,
LeaseDuration: 30 * time.Second,
Retry: DefaultConfig("dummy", "dummy").Retry,
},
expectError: true,
errorMsg: "PollInterval must be positive",
},
{
name: "invalid batch size",
config: Config{
ConsumerGroup: "test",
WorkerID: "test-worker",
PollInterval: 100 * time.Millisecond,
BatchSize: 0,
VisibilityTimeout: 60 * time.Second,
LeaseRenewalInterval: 10 * time.Second,
LeaseDuration: 30 * time.Second,
Retry: DefaultConfig("dummy", "dummy").Retry,
},
expectError: true,
errorMsg: "BatchSize must be positive",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.config.Validate()
if tt.expectError {
require.Error(t, err)
assert.Contains(t, err.Error(), tt.errorMsg)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AI likes to generate string comparisons for error instead of error types (because typically there are no error types); I think this is a bad practice that makes tests fragile.
Just asserting the error is usually ok; when required by code flow, can dedicate an error type and use Is/As semantics.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it sort of validate which error is being returned and not short-circuiting due to any other errors in the call chain

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If error message is not part of the protocol, this makes test fragile to changes for not reason dictated by a change.
If it is part of the protocol, the general convention is for it to be anywhere in the chain and discoverable with errors.Is/errors.As, and I believe the test should follow the convention.

} else {
require.NoError(t, err)
}
})
}
}
10 changes: 10 additions & 0 deletions extensions/queue/sql/schema/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
filegroup(
name = "schema",
srcs = [
"queue_dlq.sql",
"queue_messages.sql",
"queue_offsets.sql",
"queue_partition_leases.sql",
],
visibility = ["//visibility:public"],
)
40 changes: 40 additions & 0 deletions extensions/queue/sql/schema/queue_dlq.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- DEAD LETTER QUEUE TABLE
-- Failed messages that exhausted retry attempts.

CREATE TABLE IF NOT EXISTS queue_dlq (
-- Auto-incrementing global offset for ordering/acking in DLQ
offset BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,

-- Original topic and partition
topic VARCHAR(255) NOT NULL,
partition_key VARCHAR(255) NOT NULL,

-- Message identification (for deduplication)
id VARCHAR(255) NOT NULL,

-- Message data
payload BLOB NOT NULL,
metadata JSON,

-- Original timestamps (epoch milliseconds)
created_at BIGINT UNSIGNED NOT NULL,
published_at BIGINT UNSIGNED NOT NULL,

-- DLQ-specific fields
failed_at BIGINT UNSIGNED NOT NULL,
failure_count INT UNSIGNED NOT NULL,
last_error TEXT,

-- Supports: SELECT ... WHERE topic=? AND partition_key=? AND failed_at>=? ORDER BY failed_at
-- Used for fetching recently failed messages for a specific topic/partition, e.g., for retrying or monitoring
INDEX idx_topic_partition_failed (topic, partition_key, failed_at),

-- Supports: SELECT ... WHERE topic=? AND failed_at>=? ORDER BY failed_at
-- Used for fetching recently failed messages across all partitions of a topic
INDEX idx_failed_at (topic, failed_at),

-- Unique constraint to prevent duplicate entries for the same message in the DLQ
-- Supports: INSERT ... ON DUPLICATE KEY to enforce idempotent DLQ operations
-- Also enables efficient lookups for retrying or inspecting specific failed messages
UNIQUE KEY idx_topic_partition_id (topic, partition_key, id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
41 changes: 41 additions & 0 deletions extensions/queue/sql/schema/queue_messages.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- MESSAGES TABLE
-- Single table for all topics. Partition key determines distribution across workers.
-- Example: topic="merge_queue", partition_key="uber/cadence"

CREATE TABLE IF NOT EXISTS queue_messages (
-- Auto-incrementing global offset for ordering
offset BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,

-- Topic identifies the queue type
topic VARCHAR(255) NOT NULL,

-- Partition key for distributing work across workers
-- Example: repo ID, user ID, tenant ID
partition_key VARCHAR(255) NOT NULL,

-- Message identification
id VARCHAR(255) NOT NULL,

-- Message data
payload BLOB NOT NULL,
metadata JSON,

-- Retry tracking (persistent across workers)
retry_count INT UNSIGNED NOT NULL,

-- Visibility timeout (epoch milliseconds)
-- Messages invisible until this timestamp expires
invisible_until BIGINT UNSIGNED NOT NULL,

-- Timestamps (epoch milliseconds)
created_at BIGINT UNSIGNED NOT NULL,
published_at BIGINT UNSIGNED NOT NULL,

-- Supports: SELECT ... WHERE topic=? AND partition_key=? AND invisible_until<=? ORDER BY offset
-- Used by subscribers to poll for ready-to-process messages within their assigned partition
INDEX idx_topic_partition_visible_offset (topic, partition_key, invisible_until, offset),
Comment thread
behinddwalls marked this conversation as resolved.

-- Supports: INSERT ... ON DUPLICATE KEY to enforce idempotent publishes
-- Also enables efficient lookups for message updates/deletes by ID
UNIQUE KEY idx_topic_partition_id (topic, partition_key, id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be primary key instead?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offset is the primary which is easier to reason it seems as per AI, primarily how it auto-increments and avoids page fragmentation, so having this secondary index is not too bad for this table.

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
33 changes: 33 additions & 0 deletions extensions/queue/sql/schema/queue_offsets.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
-- CONSUMER OFFSETS TABLE
-- Tracks consumption progress per consumer group + topic + partition.
-- Each partition has independent offset tracking for crash recovery.

CREATE TABLE IF NOT EXISTS queue_offsets (
-- Consumer group consuming the topic
consumer_group VARCHAR(255) NOT NULL,

-- Topic being consumed
topic VARCHAR(255) NOT NULL,

-- Partition being consumed
partition_key VARCHAR(255) NOT NULL,

-- Last offset that was successfully acked for this partition
offset_acked BIGINT UNSIGNED NOT NULL,

-- Last update timestamp (epoch milliseconds)
updated_at BIGINT UNSIGNED NOT NULL,

-- Primary key ensures each consumer group has one offset per topic/partition
-- Supports: INSERT ... ON DUPLICATE KEY UPDATE for idempotent offset updates
-- Also enables efficient lookups: SELECT ... WHERE consumer_group=? AND topic=? AND partition_key=?
PRIMARY KEY (consumer_group, topic, partition_key),

-- Supports: SELECT ... WHERE consumer_group=?
-- Used for querying all offsets for a specific consumer group (e.g., for monitoring or rebalancing)
INDEX idx_consumer_group (consumer_group),

-- Supports: SELECT ... WHERE topic=?
-- Used for querying all consumer groups consuming a specific topic
INDEX idx_topic (topic)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
Loading