Skip to content

Commit 331f173

Browse files
committed
feat(queue/sql): add publisher with gomock testing
- Implement Publisher interface with SQL MessageStore backend - Add gomock-based mocks for MessageStore interface - Use fixed timestamps in tests for repeatability - Optimize mutex usage: only lock for closed flag check - Move logging outside mutex to avoid I/O under lock - Extract validateTopicName to validation.go for sharing - Add comprehensive test coverage for publish, close, validation, and concurrency
1 parent b43bc7d commit 331f173

10 files changed

Lines changed: 558 additions & 2 deletions

File tree

MODULE.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use_repo(
3939
"org_golang_google_grpc",
4040
"org_golang_google_protobuf",
4141
"org_uber_go_fx",
42+
"org_uber_go_mock",
4243
"org_uber_go_yarpc",
4344
"org_uber_go_zap",
4445
)

extensions/queue/sql/BUILD.bazel

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,37 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "sql",
5-
srcs = ["config.go"],
5+
srcs = [
6+
"config.go",
7+
"publisher.go",
8+
"stores.go",
9+
"validation.go",
10+
],
611
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
712
visibility = ["//visibility:public"],
13+
deps = [
14+
"//entities/queue",
15+
"@com_github_uber_go_tally_v4//:tally",
16+
"@org_uber_go_zap//:zap",
17+
],
818
)
919

1020
go_test(
1121
name = "sql_test",
12-
srcs = ["config_test.go"],
22+
srcs = [
23+
"config_test.go",
24+
"publisher_test.go",
25+
],
26+
data = ["//extensions/queue/sql/schema"],
1327
embed = [":sql"],
1428
deps = [
29+
"//entities/queue",
30+
"//extensions/queue",
31+
"//extensions/queue/sql/mocks",
1532
"@com_github_stretchr_testify//assert",
1633
"@com_github_stretchr_testify//require",
34+
"@com_github_uber_go_tally_v4//:tally",
35+
"@org_uber_go_mock//gomock",
36+
"@org_uber_go_zap//zaptest",
1737
],
1838
)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
load("@rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "mocks",
5+
srcs = ["mock_stores.go"],
6+
importpath = "github.com/uber/submitqueue/extensions/queue/sql/mocks",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//entities/queue",
10+
"@org_uber_go_mock//gomock",
11+
],
12+
)

extensions/queue/sql/mocks/mock_stores.go

Lines changed: 56 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extensions/queue/sql/publisher.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package sql
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/uber-go/tally/v4"
9+
"go.uber.org/zap"
10+
11+
"github.com/uber/submitqueue/entities/queue"
12+
)
13+
14+
type publisher struct {
15+
config Config
16+
logger *zap.SugaredLogger
17+
metrics tally.Scope
18+
messageStore MessageStore
19+
mu sync.RWMutex
20+
closed bool
21+
}
22+
23+
// NewPublisher creates a publisher with the given configuration and dependencies
24+
func NewPublisher(config Config, logger *zap.SugaredLogger, metrics tally.Scope, messageStore MessageStore) *publisher {
25+
return &publisher{
26+
config: config,
27+
logger: logger,
28+
metrics: metrics,
29+
messageStore: messageStore,
30+
}
31+
}
32+
33+
// Publish sends a message to the specified topic
34+
func (p *publisher) Publish(ctx context.Context, topic string, message queue.Message) error {
35+
// Check if closed (under lock)
36+
p.mu.RLock()
37+
closed := p.closed
38+
p.mu.RUnlock()
39+
40+
if closed {
41+
p.logger.Errorw("publish failure: publisher is closed", "topic", topic)
42+
return fmt.Errorf("publisher is closed")
43+
}
44+
45+
// Validate topic name (SQL-safe)
46+
if err := validateTopicName(topic); err != nil {
47+
p.logger.Errorw("publish failure: invalid topic name", "topic", topic, "error", err)
48+
p.metrics.Tagged(map[string]string{"topic": topic}).Counter("publish_errors").Inc(1)
49+
return fmt.Errorf("publish failure: invalid topic name. err: %w", err)
50+
}
51+
52+
if err := p.messageStore.Insert(ctx, topic, []queue.Message{message}); err != nil {
53+
p.metrics.Tagged(map[string]string{"topic": topic}).Counter("publish_errors").Inc(1)
54+
p.logger.Errorw("publish failure: message store insert error", "topic", topic, "error", err)
55+
return fmt.Errorf("publish failure: message store insert error. err: %w", err)
56+
}
57+
58+
p.metrics.Tagged(map[string]string{"topic": topic}).Counter("messages_published").Inc(1)
59+
p.logger.Debugw("published message", "topic", topic, "message_id", message.ID)
60+
61+
return nil
62+
}
63+
64+
// Close gracefully shuts down the publisher
65+
func (p *publisher) Close() error {
66+
p.mu.Lock()
67+
p.closed = true
68+
p.mu.Unlock()
69+
70+
p.logger.Info("publisher closed")
71+
return nil
72+
}

0 commit comments

Comments
 (0)