Skip to content

Commit 45eec0a

Browse files
committed
refactor(queue/mysql): immutable log, delivery state, heartbeat store, fair leasing
- Add delivery state store for tracking per-consumer message state separately from the immutable log - Add subscriber heartbeat store for liveness detection and fair partition leasing - Refactor message store to append-only immutable log with contiguous watermark tracking - Refactor partition lease store with heartbeat-based fair leasing - Add batch query support for offset and delivery state operations - Add GC throttling and dead code removal - Fix make fmt to skip read-only pkg/mod directory
1 parent 1cf33fe commit 45eec0a

31 files changed

Lines changed: 3871 additions & 1038 deletions

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ e2e-test: build-all-linux ## Run end-to-end tests (hermetic, auto-builds binarie
8989

9090
fmt: ## Format Go and YAML code
9191
@echo "Formatting Go code..."
92-
@$(BAZEL) run @rules_go//go -- run golang.org/x/tools/cmd/goimports@$(GOIMPORTS_VERSION) -w .
92+
@find . -name '*.go' -not -path './pkg/*' -not -path './bazel-*' | xargs $(BAZEL) run @rules_go//go -- run golang.org/x/tools/cmd/goimports@$(GOIMPORTS_VERSION) -w
9393
@echo "Formatting YAML files..."
9494
@$(BAZEL) run @rules_go//go -- run github.com/google/yamlfmt/cmd/yamlfmt@$(YAMLFMT_VERSION)
9595
@echo "Formatting complete!"

extension/queue/mysql/BUILD.bazel

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ gomock(
88
out = "mock_stores.go",
99
mockgen_tool = _MOCKGEN,
1010
package = "mysql",
11+
self_package = "github.com/uber/submitqueue/extension/queue/mysql",
1112
source = "stores.go",
1213
source_importpath = "github.com/uber/submitqueue/extension/queue/mysql",
1314
)
@@ -17,6 +18,7 @@ go_library(
1718
name = "mysql",
1819
srcs = [
1920
"constants.go",
21+
"delivery_state_store.go",
2022
"errors.go",
2123
"message_store.go",
2224
":mock_stores_src",
@@ -26,10 +28,12 @@ go_library(
2628
"sql.go",
2729
"stores.go",
2830
"subscriber.go",
31+
"subscriber_heartbeat_store.go",
2932
],
3033
importpath = "github.com/uber/submitqueue/extension/queue/mysql",
3134
visibility = ["//visibility:public"],
3235
deps = [
36+
"//core/metrics",
3337
"//entity/queue",
3438
"//extension/queue",
3539
"@com_github_uber_go_tally_v4//:tally",
@@ -41,11 +45,13 @@ go_library(
4145
go_test(
4246
name = "mysql_test",
4347
srcs = [
48+
"delivery_state_store_test.go",
4449
"message_store_test.go",
4550
"offset_store_test.go",
4651
"partition_lease_store_test.go",
4752
"publisher_test.go",
4853
"sql_test.go",
54+
"subscriber_heartbeat_store_test.go",
4955
"subscriber_test.go",
5056
],
5157
embed = [":mysql"],

extension/queue/mysql/constants.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,9 @@ package mysql
1717
// Common constants for frequently repeated strings across stores
1818

1919
const (
20-
// Tag key (used in every Tagged() call)
21-
tagErrorType = "error_type"
22-
2320
// Common log field names (used extensively across all stores)
2421
logTopic = "topic"
2522
logPartitionKey = "partition_key"
2623
logMessageID = "message_id"
2724
logError = "error"
28-
29-
// Error types used across multiple methods/stores
30-
errorBeginTx = "begin_transaction"
31-
errorCommit = "commit"
3225
)

extension/queue/mysql/ctl/commands.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,6 @@ func newTopicStatsCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command {
151151
rows := [][]string{
152152
{"Topic", stats.Topic},
153153
{"Total Messages", strconv.FormatInt(stats.TotalMessages, 10)},
154-
{"Visible Messages", strconv.FormatInt(stats.VisibleMessages, 10)},
155-
{"Invisible Messages", strconv.FormatInt(stats.InvisibleMessages, 10)},
156154
{"DLQ Count", strconv.FormatInt(stats.DLQCount, 10)},
157155
{"Partitions", strconv.FormatInt(stats.PartitionCount, 10)},
158156
{"Consumer Groups", strconv.FormatInt(stats.ConsumerGroupCount, 10)},
@@ -181,16 +179,15 @@ func newListMessagesCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command {
181179
if *jsonOut {
182180
return lib.FormatJSON(os.Stdout, messages)
183181
}
184-
headers := []string{"OFFSET", "ID", "PARTITION", "RETRIES", "INVISIBLE_UNTIL", "CREATED_AT"}
182+
headers := []string{"OFFSET", "ID", "PARTITION", "CREATED_AT", "PUBLISHED_AT"}
185183
var rows [][]string
186184
for _, m := range messages {
187185
rows = append(rows, []string{
188186
strconv.FormatInt(m.Offset, 10),
189187
m.ID,
190188
m.PartitionKey,
191-
strconv.Itoa(m.RetryCount),
192-
lib.FormatMillis(m.InvisibleUntil),
193189
lib.FormatMillis(m.CreatedAt),
190+
lib.FormatMillis(m.PublishedAt),
194191
})
195192
}
196193
lib.FormatTable(os.Stdout, headers, rows)
@@ -226,8 +223,6 @@ func newInspectMessageCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command
226223
{"ID", detail.ID},
227224
{"Topic", detail.Topic},
228225
{"Partition", detail.PartitionKey},
229-
{"Retry Count", strconv.Itoa(detail.RetryCount)},
230-
{"Invisible Until", lib.FormatMillis(detail.InvisibleUntil)},
231226
{"Created At", lib.FormatMillis(detail.CreatedAt)},
232227
{"Published At", lib.FormatMillis(detail.PublishedAt)},
233228
{"Payload", string(detail.Payload)},
@@ -321,14 +316,13 @@ func newListDLQCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command {
321316
if *jsonOut {
322317
return lib.FormatJSON(os.Stdout, messages)
323318
}
324-
headers := []string{"OFFSET", "ID", "PARTITION", "RETRIES", "CREATED_AT"}
319+
headers := []string{"OFFSET", "ID", "PARTITION", "CREATED_AT"}
325320
var rows [][]string
326321
for _, m := range messages {
327322
rows = append(rows, []string{
328323
strconv.FormatInt(m.Offset, 10),
329324
m.ID,
330325
m.PartitionKey,
331-
strconv.Itoa(m.RetryCount),
332326
lib.FormatMillis(m.CreatedAt),
333327
})
334328
}

extension/queue/mysql/ctl/lib/admin.go

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,6 @@ type MessageSummary struct {
4545
Topic string
4646
// PartitionKey determines message distribution
4747
PartitionKey string
48-
// RetryCount tracks retries on the current topic
49-
RetryCount int
50-
// InvisibleUntil is the epoch milliseconds until which the message is hidden
51-
InvisibleUntil int64
5248
// CreatedAt is the epoch milliseconds when the message was created
5349
CreatedAt int64
5450
// PublishedAt is the epoch milliseconds when the message was published
@@ -116,10 +112,6 @@ type TopicStats struct {
116112
Topic string
117113
// TotalMessages is the total number of messages
118114
TotalMessages int64
119-
// VisibleMessages is the count of messages currently visible for consumption
120-
VisibleMessages int64
121-
// InvisibleMessages is the count of messages hidden by visibility timeout
122-
InvisibleMessages int64
123115
// DLQCount is the number of messages in the DLQ for this topic
124116
DLQCount int64
125117
// PartitionCount is the number of distinct partitions
@@ -154,7 +146,6 @@ func (s *AdminStore) ListTopics(ctx context.Context) ([]TopicInfo, error) {
154146
// GetTopicStats returns detailed statistics for a topic.
155147
func (s *AdminStore) GetTopicStats(ctx context.Context, topic string, dlqSuffix string) (TopicStats, error) {
156148
stats := TopicStats{Topic: topic}
157-
nowMs := time.Now().UnixMilli()
158149

159150
// Total messages
160151
err := s.db.QueryRowContext(ctx,
@@ -165,18 +156,6 @@ func (s *AdminStore) GetTopicStats(ctx context.Context, topic string, dlqSuffix
165156
return stats, fmt.Errorf("count total: %w", err)
166157
}
167158

168-
// Visible messages (invisible_until <= now)
169-
err = s.db.QueryRowContext(ctx,
170-
fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE topic = ? AND invisible_until <= ?", mysql.MessagesTableName),
171-
topic, nowMs,
172-
).Scan(&stats.VisibleMessages)
173-
if err != nil {
174-
return stats, fmt.Errorf("count visible: %w", err)
175-
}
176-
177-
// Invisible messages
178-
stats.InvisibleMessages = stats.TotalMessages - stats.VisibleMessages
179-
180159
// DLQ count
181160
dlqTopic := topic + dlqSuffix
182161
err = s.db.QueryRowContext(ctx,
@@ -215,12 +194,12 @@ func (s *AdminStore) ListMessages(ctx context.Context, topic string, partition s
215194

216195
if partition != "" {
217196
rows, err = s.db.QueryContext(ctx,
218-
fmt.Sprintf("SELECT `offset`, id, topic, partition_key, retry_count, invisible_until, created_at, published_at FROM %s WHERE topic = ? AND partition_key = ? ORDER BY `offset` LIMIT ?", mysql.MessagesTableName),
197+
fmt.Sprintf("SELECT `offset`, id, topic, partition_key, created_at, published_at FROM %s WHERE topic = ? AND partition_key = ? ORDER BY `offset` LIMIT ?", mysql.MessagesTableName),
219198
topic, partition, limit,
220199
)
221200
} else {
222201
rows, err = s.db.QueryContext(ctx,
223-
fmt.Sprintf("SELECT `offset`, id, topic, partition_key, retry_count, invisible_until, created_at, published_at FROM %s WHERE topic = ? ORDER BY `offset` LIMIT ?", mysql.MessagesTableName),
202+
fmt.Sprintf("SELECT `offset`, id, topic, partition_key, created_at, published_at FROM %s WHERE topic = ? ORDER BY `offset` LIMIT ?", mysql.MessagesTableName),
224203
topic, limit,
225204
)
226205
}
@@ -232,7 +211,7 @@ func (s *AdminStore) ListMessages(ctx context.Context, topic string, partition s
232211
var messages []MessageSummary
233212
for rows.Next() {
234213
var m MessageSummary
235-
if err := rows.Scan(&m.Offset, &m.ID, &m.Topic, &m.PartitionKey, &m.RetryCount, &m.InvisibleUntil, &m.CreatedAt, &m.PublishedAt); err != nil {
214+
if err := rows.Scan(&m.Offset, &m.ID, &m.Topic, &m.PartitionKey, &m.CreatedAt, &m.PublishedAt); err != nil {
236215
return nil, fmt.Errorf("scan message row: %w", err)
237216
}
238217
messages = append(messages, m)
@@ -246,9 +225,9 @@ func (s *AdminStore) InspectMessage(ctx context.Context, topic string, messageID
246225
var metadataJSON []byte
247226

248227
err := s.db.QueryRowContext(ctx,
249-
fmt.Sprintf("SELECT `offset`, id, topic, partition_key, retry_count, invisible_until, created_at, published_at, payload, metadata, failed_at, failure_count, last_error, original_topic FROM %s WHERE topic = ? AND id = ?", mysql.MessagesTableName),
228+
fmt.Sprintf("SELECT `offset`, id, topic, partition_key, created_at, published_at, payload, metadata, failed_at, failure_count, last_error, original_topic FROM %s WHERE topic = ? AND id = ?", mysql.MessagesTableName),
250229
topic, messageID,
251-
).Scan(&d.Offset, &d.ID, &d.Topic, &d.PartitionKey, &d.RetryCount, &d.InvisibleUntil, &d.CreatedAt, &d.PublishedAt, &d.Payload, &metadataJSON, &d.FailedAt, &d.FailureCount, &d.LastError, &d.OriginalTopic)
230+
).Scan(&d.Offset, &d.ID, &d.Topic, &d.PartitionKey, &d.CreatedAt, &d.PublishedAt, &d.Payload, &metadataJSON, &d.FailedAt, &d.FailureCount, &d.LastError, &d.OriginalTopic)
252231
if err == sql.ErrNoRows {
253232
return d, false, nil
254233
}
@@ -323,7 +302,7 @@ func (s *AdminStore) RequeueDLQ(ctx context.Context, topic string, messageID str
323302
// Insert into original topic with reset fields
324303
nowMs := time.Now().UnixMilli()
325304
_, err = tx.ExecContext(ctx,
326-
fmt.Sprintf("INSERT INTO %s (topic, partition_key, id, payload, metadata, retry_count, invisible_until, created_at, published_at, failed_at, failure_count, last_error, original_topic) VALUES (?, ?, ?, ?, ?, 0, 0, ?, ?, 0, 0, '', '')", mysql.MessagesTableName),
305+
fmt.Sprintf("INSERT INTO %s (topic, partition_key, id, payload, metadata, created_at, published_at, failed_at, failure_count, last_error, original_topic) VALUES (?, ?, ?, ?, ?, ?, ?, 0, 0, '', '')", mysql.MessagesTableName),
327306
topic, partitionKey, messageID, payload, metadataJSON, createdAt, nowMs,
328307
)
329308
if err != nil {

extension/queue/mysql/ctl/lib/admin_test.go

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,6 @@ func TestGetTopicStats(t *testing.T) {
7575
WithArgs("orders").
7676
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(100))
7777

78-
// Visible messages
79-
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM queue_messages WHERE topic = \\? AND invisible_until <= \\?").
80-
WithArgs("orders", sqlmock.AnyArg()).
81-
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(80))
82-
8378
// DLQ count
8479
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM queue_messages WHERE topic = \\?").
8580
WithArgs("orders_dlq").
@@ -99,8 +94,6 @@ func TestGetTopicStats(t *testing.T) {
9994
require.NoError(t, err)
10095
assert.Equal(t, "orders", stats.Topic)
10196
assert.Equal(t, int64(100), stats.TotalMessages)
102-
assert.Equal(t, int64(80), stats.VisibleMessages)
103-
assert.Equal(t, int64(20), stats.InvisibleMessages)
10497
assert.Equal(t, int64(3), stats.DLQCount)
10598
assert.Equal(t, int64(4), stats.PartitionCount)
10699
assert.Equal(t, int64(2), stats.ConsumerGroupCount)
@@ -114,9 +107,9 @@ func TestListMessages(t *testing.T) {
114107

115108
store := NewAdminStore(db)
116109

117-
rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "retry_count", "invisible_until", "created_at", "published_at"}).
118-
AddRow(1, "msg-1", "orders", "repo-1", 0, 0, 1000, 1000).
119-
AddRow(2, "msg-2", "orders", "repo-1", 1, 5000, 2000, 2000)
110+
rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "created_at", "published_at"}).
111+
AddRow(1, "msg-1", "orders", "repo-1", 1000, 1000).
112+
AddRow(2, "msg-2", "orders", "repo-1", 2000, 2000)
120113
mock.ExpectQuery("SELECT .+ FROM queue_messages WHERE topic = \\? ORDER BY `offset` LIMIT \\?").
121114
WithArgs("orders", 50).
122115
WillReturnRows(rows)
@@ -127,7 +120,6 @@ func TestListMessages(t *testing.T) {
127120
assert.Equal(t, "msg-1", messages[0].ID)
128121
assert.Equal(t, int64(1), messages[0].Offset)
129122
assert.Equal(t, "msg-2", messages[1].ID)
130-
assert.Equal(t, 1, messages[1].RetryCount)
131123
assert.NoError(t, mock.ExpectationsWereMet())
132124
}
133125

@@ -138,8 +130,8 @@ func TestListMessagesWithPartition(t *testing.T) {
138130

139131
store := NewAdminStore(db)
140132

141-
rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "retry_count", "invisible_until", "created_at", "published_at"}).
142-
AddRow(1, "msg-1", "orders", "repo-1", 0, 0, 1000, 1000)
133+
rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "created_at", "published_at"}).
134+
AddRow(1, "msg-1", "orders", "repo-1", 1000, 1000)
143135
mock.ExpectQuery("SELECT .+ FROM queue_messages WHERE topic = \\? AND partition_key = \\? ORDER BY `offset` LIMIT \\?").
144136
WithArgs("orders", "repo-1", 10).
145137
WillReturnRows(rows)
@@ -158,8 +150,8 @@ func TestInspectMessage(t *testing.T) {
158150

159151
store := NewAdminStore(db)
160152

161-
rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "retry_count", "invisible_until", "created_at", "published_at", "payload", "metadata", "failed_at", "failure_count", "last_error", "original_topic"}).
162-
AddRow(1, "msg-1", "orders", "repo-1", 0, 0, 1000, 1000, []byte("hello"), []byte(`{"key":"val"}`), 0, 0, "", "")
153+
rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "created_at", "published_at", "payload", "metadata", "failed_at", "failure_count", "last_error", "original_topic"}).
154+
AddRow(1, "msg-1", "orders", "repo-1", 1000, 1000, []byte("hello"), []byte(`{"key":"val"}`), 0, 0, "", "")
163155
mock.ExpectQuery("SELECT .+ FROM queue_messages WHERE topic = \\? AND id = \\?").
164156
WithArgs("orders", "msg-1").
165157
WillReturnRows(rows)
@@ -181,7 +173,7 @@ func TestInspectMessageNotFound(t *testing.T) {
181173

182174
store := NewAdminStore(db)
183175

184-
rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "retry_count", "invisible_until", "created_at", "published_at", "payload", "metadata", "failed_at", "failure_count", "last_error", "original_topic"})
176+
rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "created_at", "published_at", "payload", "metadata", "failed_at", "failure_count", "last_error", "original_topic"})
185177
mock.ExpectQuery("SELECT .+ FROM queue_messages WHERE topic = \\? AND id = \\?").
186178
WithArgs("orders", "missing").
187179
WillReturnRows(rows)

0 commit comments

Comments
 (0)