Skip to content

Commit d64e7ab

Browse files
committed
feat(queue/sql): add data access layer stores
## Why? Need data access layer to abstract database operations for message storage, offset tracking, and partition leasing. ## What? - MessageStore: insert, fetch, delete, DLQ, and visibility timeout operations - OffsetStore: consumer offset tracking with atomic ack - PartitionLeaseStore: distributed partition leasing with automatic discovery - Comprehensive test coverage using in-memory MySQL ## Test Plan - All store operations tested (insert, fetch, ack, lease) - Atomic operations verified (visibility timeout, offset updates) - Partition discovery and lease acquisition tested - DLQ operations validated
1 parent 38c7cf2 commit d64e7ab

10 files changed

Lines changed: 2464 additions & 2 deletions

File tree

extensions/queue/sql/BUILD.bazel

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,44 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "sql",
5-
srcs = ["config.go"],
5+
srcs = [
6+
"config.go",
7+
"constants.go",
8+
"message_store.go",
9+
"offset_store.go",
10+
"partition_lease_store.go",
11+
"test_helpers.go",
12+
],
613
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
714
visibility = ["//visibility:public"],
15+
deps = [
16+
"//entities/queue",
17+
"@com_github_dolthub_go_mysql_server//:go-mysql-server",
18+
"@com_github_dolthub_go_mysql_server//memory",
19+
"@com_github_dolthub_go_mysql_server//server",
20+
"@com_github_dolthub_go_mysql_server//sql",
21+
"@com_github_dolthub_vitess//go/mysql",
22+
"@com_github_go_sql_driver_mysql//:mysql",
23+
"@com_github_stretchr_testify//require",
24+
"@com_github_uber_go_tally_v4//:tally",
25+
"@org_uber_go_zap//:zap",
26+
"@org_uber_go_zap//zaptest",
27+
],
828
)
929

1030
go_test(
1131
name = "sql_test",
12-
srcs = ["config_test.go"],
32+
srcs = [
33+
"config_test.go",
34+
"message_store_test.go",
35+
"offset_store_test.go",
36+
"partition_lease_store_test.go",
37+
],
38+
data = ["//schema/queue/mysql:schema.sql"],
1339
embed = [":sql"],
1440
deps = [
41+
"//entities/queue",
42+
"@com_github_go_sql_driver_mysql//:mysql",
1543
"@com_github_stretchr_testify//assert",
1644
"@com_github_stretchr_testify//require",
1745
],

extensions/queue/sql/constants.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package sql
2+
3+
// Common constants for frequently repeated strings across stores
4+
5+
const (
6+
// Tag key (used in every Tagged() call)
7+
tagErrorType = "error_type"
8+
9+
// Common log field names (used extensively across all stores)
10+
logTopic = "topic"
11+
logPartitionKey = "partition_key"
12+
logMessageID = "message_id"
13+
logError = "error"
14+
15+
// Error types used across multiple methods/stores
16+
errorBeginTx = "begin_transaction"
17+
errorCommit = "commit"
18+
)

0 commit comments

Comments
 (0)