Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example/server/gateway/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
importpath = "github.com/uber/submitqueue/example/server/gateway",
visibility = ["//visibility:private"],
deps = [
"//core/consumer",
"//extension/counter/mysql",
"//extension/queue/mysql",
"//extension/storage/mysql",
Expand Down
13 changes: 12 additions & 1 deletion example/server/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

_ "github.com/go-sql-driver/mysql"
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
mysqlstorage "github.com/uber/submitqueue/extension/storage/mysql"
Expand Down Expand Up @@ -158,6 +159,16 @@ func run() error {
zap.String("queue_dsn", queueDSN),
)

// Build a publish-only topic registry: gateway only feeds the start of the
// orchestrator pipeline (TopicKeyStart). No subscription is configured
// because the gateway never consumes from the queue.
registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{
{Key: consumer.TopicKeyStart, Name: "start", Queue: mysqlQueue},
})
if err != nil {
return fmt.Errorf("failed to create topic registry: %w", err)
}

// Create gRPC server
grpcServer := grpc.NewServer()

Expand All @@ -166,7 +177,7 @@ func run() error {

// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
landController := controller.NewLandController(logger.Sugar(), scope, cnt, mysqlQueue.Publisher(), requestLogStore, "request")
landController := controller.NewLandController(logger.Sugar(), scope, cnt, requestLogStore, registry)
gatewayServer := &GatewayServer{
pingController: pingController,
landController: landController,
Expand Down
6 changes: 3 additions & 3 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,10 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
return consumer.NewTopicRegistry([]consumer.TopicConfig{
{
Key: consumer.TopicKeyStart,
Name: "request",
Name: "start",
Queue: q,
Subscription: extqueue.DefaultSubscriptionConfig(
subscriberName, "orchestrator-request",
subscriberName, "orchestrator-start",
),
},
{
Expand Down Expand Up @@ -396,7 +396,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
store,
registry,
consumer.TopicKeyStart,
"orchestrator-request",
"orchestrator-start",
)
if err := c.Register(requestController); err != nil {
return fmt.Errorf("failed to register request controller: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion gateway/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ go_library(
importpath = "github.com/uber/submitqueue/gateway/controller",
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//core/errs",
"//entity",
"//entity/queue",
"//extension/counter",
"//extension/queue",
"//extension/storage",
"//gateway/protopb",
"@com_github_uber_go_tally_v4//:tally",
Expand All @@ -29,6 +29,7 @@ go_test(
],
embed = [":controller"],
deps = [
"//core/consumer",
"//entity",
"//entity/queue",
"//extension/counter/mock",
Expand Down
28 changes: 18 additions & 10 deletions gateway/controller/land.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"time"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/entity/queue"
"github.com/uber/submitqueue/extension/counter"
extqueue "github.com/uber/submitqueue/extension/queue"
"github.com/uber/submitqueue/extension/storage"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/zap"
Expand All @@ -45,21 +45,20 @@ type LandController struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
counter counter.Counter
publisher extqueue.Publisher
requestLogStore storage.RequestLogStore
topic string // Topic to publish requests to (e.g., "request", "land_request")
registry consumer.TopicRegistry
}

// NewLandController creates a new instance of the gateway land controller.
// topic: the queue topic to publish requests to (e.g., "request", "land_request")
func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, publisher extqueue.Publisher, requestLogStore storage.RequestLogStore, topic string) *LandController {
// The controller publishes land requests to the topic registered under
// consumer.TopicKeyStart in the registry.
func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, requestLogStore storage.RequestLogStore, registry consumer.TopicRegistry) *LandController {
return &LandController{
logger: logger,
metricsScope: scope,
counter: counter,
publisher: publisher,
requestLogStore: requestLogStore,
topic: topic,
registry: registry,
}
}

Expand Down Expand Up @@ -130,7 +129,7 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan
c.logger.Infow("request published to queue",
"queue", req.Queue,
"sqid", landRequest.ID,
"topic", c.topic,
"topic_key", consumer.TopicKeyStart,
)
c.metricsScope.Counter("publish_success").Inc(1)

Expand All @@ -153,8 +152,17 @@ func (c *LandController) publishToQueue(ctx context.Context, landRequest entity.
// - Partition key: landRequest.Queue (ensures ordering per queue)
msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil)

// Publish to request topic
if err := c.publisher.Publish(ctx, c.topic, msg); err != nil {
q, ok := c.registry.Queue(consumer.TopicKeyStart)
if !ok {
return fmt.Errorf("no queue registered for topic key %s", consumer.TopicKeyStart)
}

topicName, ok := c.registry.TopicName(consumer.TopicKeyStart)
if !ok {
return fmt.Errorf("no topic name registered for topic key %s", consumer.TopicKeyStart)
}

if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
return fmt.Errorf("failed to publish land request message: %w", err)
}

Expand Down
53 changes: 37 additions & 16 deletions gateway/controller/land_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/entity/queue"
countermock "github.com/uber/submitqueue/extension/counter/mock"
Expand All @@ -32,11 +33,29 @@ import (
"go.uber.org/zap"
)

// noopPublisher returns a mock publisher that succeeds silently.
func noopPublisher(ctrl *gomock.Controller) *queuemock.MockPublisher {
// newTestRegistry builds a single-entry TopicRegistry for TopicKeyStart wired
// to a mock Queue/Publisher and returns both the registry and the publisher
// mock so callers can set EXPECT() on the publisher.
func newTestRegistry(t *testing.T, ctrl *gomock.Controller) (consumer.TopicRegistry, *queuemock.MockPublisher) {
t.Helper()
pub := queuemock.NewMockPublisher(ctrl)
q := queuemock.NewMockQueue(ctrl)
q.EXPECT().Publisher().Return(pub).AnyTimes()

registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{
{Key: consumer.TopicKeyStart, Name: "start", Queue: q},
})
require.NoError(t, err)
return registry, pub
}

// newTestRegistryWithNoopPublisher returns a registry whose publisher silently
// accepts any Publish call. Use for tests that don't care about publish behavior.
func newTestRegistryWithNoopPublisher(t *testing.T, ctrl *gomock.Controller) consumer.TopicRegistry {
t.Helper()
registry, pub := newTestRegistry(t, ctrl)
pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return pub
return registry
}

// noopRequestLogStore returns a mock RequestLogStore that succeeds silently.
Expand All @@ -50,7 +69,7 @@ func TestNewLandController(t *testing.T) {
ctrl := gomock.NewController(t)

cnt := countermock.NewMockCounter(ctrl)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
require.NotNil(t, controller)
}

Expand All @@ -59,7 +78,7 @@ func TestLand_ReturnsSqid(t *testing.T) {

cnt := countermock.NewMockCounter(ctrl)
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -77,7 +96,7 @@ func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) {

cnt := countermock.NewMockCounter(ctrl)
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable"))
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -101,7 +120,7 @@ func TestLand_CounterDomainIncludesQueue(t *testing.T) {
return 1, nil
},
)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -118,7 +137,7 @@ func TestLand_ReturnsErrorOnEmptyQueue(t *testing.T) {
ctrl := gomock.NewController(t)

cnt := countermock.NewMockCounter(ctrl)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -135,7 +154,7 @@ func TestLand_ReturnsErrorOnEmptyChangeUri(t *testing.T) {
ctrl := gomock.NewController(t)

cnt := countermock.NewMockCounter(ctrl)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -152,7 +171,7 @@ func TestLand_ReturnsErrorOnNilChange(t *testing.T) {
ctrl := gomock.NewController(t)

cnt := countermock.NewMockCounter(ctrl)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -173,7 +192,8 @@ func TestLand_PublishesToQueue(t *testing.T) {

cnt := countermock.NewMockCounter(ctrl)
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(123), nil)
publisher := queuemock.NewMockPublisher(ctrl)

registry, publisher := newTestRegistry(t, ctrl)
publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, topic string, msg queue.Message) error {
publishedTopic = topic
Expand All @@ -182,7 +202,7 @@ func TestLand_PublishesToQueue(t *testing.T) {
},
)

controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, noopRequestLogStore(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), registry)
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -195,8 +215,8 @@ func TestLand_PublishesToQueue(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, "test-queue/123", resp.Sqid)

// Verify message was published
assert.Equal(t, "request", publishedTopic)
// Verify message was published to the topic registered under TopicKeyStart
assert.Equal(t, "start", publishedTopic)
assert.Equal(t, "test-queue/123", publishedMessage.ID)
assert.Equal(t, "test-queue", publishedMessage.PartitionKey)

Expand All @@ -214,10 +234,11 @@ func TestLand_ContinuesWhenPublishFails(t *testing.T) {

cnt := countermock.NewMockCounter(ctrl)
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(999), nil)
publisher := queuemock.NewMockPublisher(ctrl)

registry, publisher := newTestRegistry(t, ctrl)
publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable"))

controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, noopRequestLogStore(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), registry)
ctx := context.Background()

req := &pb.LandRequest{
Expand Down
Loading