Skip to content

Commit 7371c3c

Browse files
authored
feat(queue/sql): add publisher implementation (#22)
## Summary ### Why? Need Publisher implementation to enable message publishing to SQL queue topics. ### What? - Publisher validates topic names and publishes single messages via MessageStore - Thread-safe with RWMutex for concurrent publish calls - Idempotent Close() operation - Comprehensive test coverage for publish, validation, metrics, and concurrency - Single and multiple message publishing tested - Invalid topic names rejected (uppercase, special chars, empty) - Publisher closed state prevents further publishes - Concurrent publish operations verified thread-safe - Context cancellation handled correctly ## Test Plan make test ## Issues ## Stack 1. @ #22 1. #23 1. #21 1. #24 1. #34
1 parent 1176753 commit 7371c3c

10 files changed

Lines changed: 546 additions & 10 deletions

File tree

MODULE.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use_repo(
3939
"org_golang_google_grpc",
4040
"org_golang_google_protobuf",
4141
"org_uber_go_fx",
42+
"org_uber_go_mock",
4243
"org_uber_go_yarpc",
4344
"org_uber_go_zap",
4445
)

extensions/queue/sql/BUILD.bazel

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,37 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "sql",
5-
srcs = ["config.go"],
5+
srcs = [
6+
"config.go",
7+
"mock_stores.go",
8+
"publisher.go",
9+
"stores.go",
10+
"validation.go",
11+
],
612
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
713
visibility = ["//visibility:public"],
14+
deps = [
15+
"//entities/queue",
16+
"@com_github_uber_go_tally_v4//:tally",
17+
"@org_uber_go_mock//gomock",
18+
"@org_uber_go_zap//:zap",
19+
],
820
)
921

1022
go_test(
1123
name = "sql_test",
12-
srcs = ["config_test.go"],
24+
srcs = [
25+
"config_test.go",
26+
"publisher_test.go",
27+
],
1328
embed = [":sql"],
1429
deps = [
30+
"//entities/queue",
31+
"//extensions/queue",
1532
"@com_github_stretchr_testify//assert",
1633
"@com_github_stretchr_testify//require",
34+
"@com_github_uber_go_tally_v4//:tally",
35+
"@org_uber_go_mock//gomock",
36+
"@org_uber_go_zap//zaptest",
1737
],
1838
)

extensions/queue/sql/config.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,6 @@ import (
55
"time"
66
)
77

8-
const (
9-
// Fixed table names for single-table design
10-
MessagesTableName = "queue_messages"
11-
PartitionLeasesTableName = "queue_partition_leases"
12-
OffsetsTableName = "queue_offsets"
13-
DLQTableName = "queue_dlq"
14-
)
15-
168
// Config holds configuration for SQL-based queue.
179
// DB connection, logger, and metrics are passed separately to NewFactory.
1810
type Config struct {

extensions/queue/sql/mock_stores.go

Lines changed: 56 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extensions/queue/sql/publisher.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package sql
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/uber-go/tally/v4"
9+
"go.uber.org/zap"
10+
11+
"github.com/uber/submitqueue/entities/queue"
12+
)
13+
14+
type publisher struct {
15+
config Config
16+
logger *zap.SugaredLogger
17+
metrics tally.Scope
18+
messageStore MessageStore
19+
mu sync.RWMutex
20+
closed bool
21+
}
22+
23+
// NewPublisher creates a publisher with the given configuration and dependencies
24+
func NewPublisher(config Config, logger *zap.SugaredLogger, metrics tally.Scope, messageStore MessageStore) *publisher {
25+
return &publisher{
26+
config: config,
27+
logger: logger,
28+
metrics: metrics,
29+
messageStore: messageStore,
30+
}
31+
}
32+
33+
// Publish sends a message to the specified topic
34+
func (p *publisher) Publish(ctx context.Context, topic string, message queue.Message) error {
35+
// Check if closed (under lock)
36+
p.mu.RLock()
37+
closed := p.closed
38+
p.mu.RUnlock()
39+
40+
if closed {
41+
p.logger.Errorw("publish failure: publisher is closed", "topic", topic)
42+
return fmt.Errorf("publisher is closed")
43+
}
44+
45+
// Validate topic name (SQL-safe)
46+
if err := validateTopicName(topic); err != nil {
47+
p.logger.Errorw("publish failure: invalid topic name", "topic", topic, "error", err)
48+
p.metrics.Tagged(map[string]string{"topic": topic}).Counter("publish_errors").Inc(1)
49+
return fmt.Errorf("publish invalid topic name: %w", err)
50+
}
51+
52+
if err := p.messageStore.Insert(ctx, topic, []queue.Message{message}); err != nil {
53+
p.metrics.Tagged(map[string]string{"topic": topic}).Counter("publish_errors").Inc(1)
54+
p.logger.Errorw("publish failure: message store insert error", "topic", topic, "error", err)
55+
return fmt.Errorf("publish message store insert error: %w", err)
56+
}
57+
58+
p.metrics.Tagged(map[string]string{"topic": topic}).Counter("messages_published").Inc(1)
59+
p.logger.Debugw("published message", "topic", topic, "message_id", message.ID)
60+
61+
return nil
62+
}
63+
64+
// Close gracefully shuts down the publisher
65+
func (p *publisher) Close() error {
66+
p.mu.Lock()
67+
p.closed = true
68+
p.mu.Unlock()
69+
70+
p.logger.Info("publisher closed")
71+
return nil
72+
}

0 commit comments

Comments
 (0)