Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use_repo(
"org_golang_google_grpc",
"org_golang_google_protobuf",
"org_uber_go_fx",
"org_uber_go_mock",
"org_uber_go_yarpc",
"org_uber_go_zap",
)
24 changes: 22 additions & 2 deletions extensions/queue/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,37 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "sql",
srcs = ["config.go"],
srcs = [
"config.go",
"mock_stores.go",
"publisher.go",
"stores.go",
"validation.go",
],
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
visibility = ["//visibility:public"],
deps = [
"//entities/queue",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "sql_test",
srcs = ["config_test.go"],
srcs = [
"config_test.go",
"publisher_test.go",
],
embed = [":sql"],
deps = [
"//entities/queue",
"//extensions/queue",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//zaptest",
],
)
8 changes: 0 additions & 8 deletions extensions/queue/sql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@ import (
"time"
)

const (
// Fixed table names for single-table design
MessagesTableName = "queue_messages"
PartitionLeasesTableName = "queue_partition_leases"
OffsetsTableName = "queue_offsets"
DLQTableName = "queue_dlq"
)

// Config holds configuration for SQL-based queue.
// DB connection, logger, and metrics are passed separately to NewFactory.
type Config struct {
Expand Down
56 changes: 56 additions & 0 deletions extensions/queue/sql/mock_stores.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 72 additions & 0 deletions extensions/queue/sql/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package sql

import (
"context"
"fmt"
"sync"

"github.com/uber-go/tally/v4"
"go.uber.org/zap"

"github.com/uber/submitqueue/entities/queue"
)

type publisher struct {
config Config
logger *zap.SugaredLogger
metrics tally.Scope
messageStore MessageStore
mu sync.RWMutex
closed bool
}

// NewPublisher creates a publisher with the given configuration and dependencies
func NewPublisher(config Config, logger *zap.SugaredLogger, metrics tally.Scope, messageStore MessageStore) *publisher {
return &publisher{
config: config,
logger: logger,
metrics: metrics,
messageStore: messageStore,
}
}

// Publish sends a message to the specified topic
func (p *publisher) Publish(ctx context.Context, topic string, message queue.Message) error {
// Check if closed (under lock)
p.mu.RLock()
closed := p.closed
p.mu.RUnlock()

if closed {
p.logger.Errorw("publish failure: publisher is closed", "topic", topic)
return fmt.Errorf("publisher is closed")
}

// Validate topic name (SQL-safe)
if err := validateTopicName(topic); err != nil {
p.logger.Errorw("publish failure: invalid topic name", "topic", topic, "error", err)
p.metrics.Tagged(map[string]string{"topic": topic}).Counter("publish_errors").Inc(1)
return fmt.Errorf("publish invalid topic name: %w", err)
}

if err := p.messageStore.Insert(ctx, topic, []queue.Message{message}); err != nil {
Comment thread
behinddwalls marked this conversation as resolved.
p.metrics.Tagged(map[string]string{"topic": topic}).Counter("publish_errors").Inc(1)
p.logger.Errorw("publish failure: message store insert error", "topic", topic, "error", err)
return fmt.Errorf("publish message store insert error: %w", err)
}

p.metrics.Tagged(map[string]string{"topic": topic}).Counter("messages_published").Inc(1)
p.logger.Debugw("published message", "topic", topic, "message_id", message.ID)

return nil
}

// Close gracefully shuts down the publisher
func (p *publisher) Close() error {
p.mu.Lock()
p.closed = true
p.mu.Unlock()

p.logger.Info("publisher closed")
return nil
}
Loading