Skip to content

Commit c19e90d

Browse files
committed
feat(queue/sql): add publisher with gomock testing
To implement the SQL-based queue publisher with proper test coverage using gomock for mocking dependencies, avoiding manual mock implementations. - Add MessageStore interface for message table operations - Implement Publisher using MessageStore for inserting messages - Add gomock-based tests for Publisher (no manual mocks) - Generate gomock mocks for MessageStore interface - Add README.md with instructions for generating mocks Test Plan: - Unit tests pass: `bazel test //extensions/queue/sql:sql_test` - All tests use gomock exclusively, no manual mocks - `make build` and `make test` pass successfully
1 parent 7dcf5fb commit c19e90d

10 files changed

Lines changed: 565 additions & 22 deletions

File tree

MODULE.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use_repo(
3636
"org_golang_google_grpc",
3737
"org_golang_google_protobuf",
3838
"org_uber_go_fx",
39+
"org_uber_go_mock",
3940
"org_uber_go_yarpc",
4041
"org_uber_go_zap",
4142
)

extensions/queue/sql/BUILD.bazel

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,36 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "sql",
5-
srcs = ["config.go"],
5+
srcs = [
6+
"config.go",
7+
"publisher.go",
8+
"stores.go",
9+
],
610
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
711
visibility = ["//visibility:public"],
12+
deps = [
13+
"//entities/queue",
14+
"@com_github_uber_go_tally_v4//:tally",
15+
"@org_uber_go_zap//:zap",
16+
],
817
)
918

1019
go_test(
1120
name = "sql_test",
12-
srcs = ["config_test.go"],
21+
srcs = [
22+
"config_test.go",
23+
"publisher_test.go",
24+
],
25+
data = ["//extensions/queue/sql/schema"],
1326
embed = [":sql"],
1427
deps = [
28+
"//entities/queue",
29+
"//extensions/queue",
30+
"//extensions/queue/sql/mocks",
1531
"@com_github_stretchr_testify//assert",
1632
"@com_github_stretchr_testify//require",
33+
"@com_github_uber_go_tally_v4//:tally",
34+
"@org_uber_go_mock//gomock",
35+
"@org_uber_go_zap//zaptest",
1736
],
1837
)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
load("@rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "mocks",
5+
srcs = ["mock_stores.go"],
6+
importpath = "github.com/uber/submitqueue/extensions/queue/sql/mocks",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//entities/queue",
10+
"@org_uber_go_mock//gomock",
11+
],
12+
)

extensions/queue/sql/mocks/mock_stores.go

Lines changed: 56 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extensions/queue/sql/publisher.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package sql
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/uber-go/tally/v4"
9+
"go.uber.org/zap"
10+
11+
"github.com/uber/submitqueue/entities/queue"
12+
)
13+
14+
type publisher struct {
15+
config Config
16+
logger *zap.SugaredLogger
17+
metrics tally.Scope
18+
messageStore MessageStore
19+
mu sync.RWMutex
20+
closed bool
21+
}
22+
23+
// Publish sends a message to the specified topic
24+
func (p *publisher) Publish(ctx context.Context, topic string, message queue.Message) error {
25+
p.mu.RLock()
26+
defer p.mu.RUnlock()
27+
28+
if p.closed {
29+
p.logger.Errorw("publish failure: publisher is closed", "topic", topic)
30+
return fmt.Errorf("publisher is closed")
31+
}
32+
33+
// Validate topic name (SQL-safe)
34+
if err := validateTopicName(topic); err != nil {
35+
p.logger.Errorw("publish failure: invalid topic name", "topic", topic, "error", err)
36+
p.metrics.Tagged(map[string]string{"topic": topic}).Counter("publish_errors").Inc(1)
37+
return err
38+
}
39+
40+
if err := p.messageStore.Insert(ctx, topic, []queue.Message{message}); err != nil {
41+
p.metrics.Tagged(map[string]string{"topic": topic}).Counter("publish_errors").Inc(1)
42+
p.logger.Errorw("publish failure: message store insert error", "topic", topic, "error", err)
43+
return err
44+
}
45+
46+
p.metrics.Tagged(map[string]string{"topic": topic}).Counter("messages_published").Inc(1)
47+
p.logger.Infow("published message", "topic", topic, "message_id", message.ID)
48+
49+
return nil
50+
}
51+
52+
// Close gracefully shuts down the publisher
53+
func (p *publisher) Close() error {
54+
p.mu.Lock()
55+
defer p.mu.Unlock()
56+
57+
p.closed = true
58+
p.logger.Info("publisher closed")
59+
return nil
60+
}
61+
62+
// validateTopicName ensures topic name is safe for use as SQL table name
63+
func validateTopicName(topic string) error {
64+
if topic == "" {
65+
return fmt.Errorf("topic name cannot be empty")
66+
}
67+
if len(topic) > 255 {
68+
return fmt.Errorf("topic name too long (max 255 characters)")
69+
}
70+
// Only allow lowercase letters, numbers, and underscores
71+
for _, c := range topic {
72+
if !((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_') {
73+
return fmt.Errorf("topic name must contain only lowercase letters, numbers, and underscores")
74+
}
75+
}
76+
return nil
77+
}

0 commit comments

Comments
 (0)