Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/consumer/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
TopicKeyMerge TopicKey = "merge"
// TopicKeyConclude is the pipeline stage where merged requests are published for conclusion.
TopicKeyConclude TopicKey = "conclude"
// TopicKeyCancel is the pipeline stage where cancellation requests are published for processing.
TopicKeyCancel TopicKey = "cancel"
)

// String returns the topic key as a string.
Expand Down
2 changes: 2 additions & 0 deletions entity/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"batch.go",
"batch_dependent.go",
"build.go",
"cancel.go",
"change_provider.go",
"queue_config.go",
"request.go",
Expand All @@ -20,6 +21,7 @@ go_test(
srcs = [
"batch_test.go",
"build_test.go",
"cancel_test.go",
"request_test.go",
],
embed = [":entity"],
Expand Down
22 changes: 22 additions & 0 deletions entity/cancel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package entity

import "encoding/json"

// Cancel represents a request to cancel a previously submitted land request.
// The object is immutable after creation.
type Cancel struct {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mb CancelRequest?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not critical

// Sqid is the globally unique identifier of the land request to cancel.
Sqid string `json:"sqid"`
}

// ToBytes serializes the Cancel to JSON bytes for queue message payload.
func (c Cancel) ToBytes() ([]byte, error) {
return json.Marshal(c)
}

// CancelFromBytes deserializes a Cancel from JSON bytes.
func CancelFromBytes(data []byte) (Cancel, error) {
var c Cancel
err := json.Unmarshal(data, &c)
return c, err
}
63 changes: 63 additions & 0 deletions entity/cancel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package entity

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCancel_ToBytes(t *testing.T) {
cancel := Cancel{
Sqid: "test-queue/123",
}

data, err := cancel.ToBytes()
require.NoError(t, err)
assert.Contains(t, string(data), "test-queue/123")
}

func TestCancelFromBytes(t *testing.T) {
original := Cancel{
Sqid: "my-queue/999",
}

data, err := original.ToBytes()
require.NoError(t, err)

deserialized, err := CancelFromBytes(data)
require.NoError(t, err)
assert.Equal(t, original, deserialized)
}

func TestCancelFromBytes_InvalidJSON(t *testing.T) {
_, err := CancelFromBytes([]byte(`{"invalid": json"}`))
assert.Error(t, err)
}

func TestCancel_SerializationRoundTrip(t *testing.T) {
tests := []struct {
name string
cancel Cancel
}{
{
name: "standard sqid",
cancel: Cancel{Sqid: "queue1/100"},
},
{
name: "long sqid",
cancel: Cancel{Sqid: "my-long-queue-name/999999"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
data, err := tt.cancel.ToBytes()
require.NoError(t, err)

deserialized, err := CancelFromBytes(data)
require.NoError(t, err)
assert.Equal(t, tt.cancel, deserialized)
})
}
}
17 changes: 13 additions & 4 deletions example/server/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
// GatewayServer wraps the controller and implements the gRPC service interface
type GatewayServer struct {
pb.UnimplementedSubmitQueueGatewayServer
pingController *controller.PingController
landController *controller.LandController
pingController *controller.PingController
landController *controller.LandController
cancelController *controller.CancelController
}

// Ping delegates to the controller
Expand All @@ -39,6 +40,11 @@ func (s *GatewayServer) Land(ctx context.Context, req *pb.LandRequest) (*pb.Land
return s.landController.Land(ctx, req)
}

// Cancel delegates to the controller
func (s *GatewayServer) Cancel(ctx context.Context, req *pb.CancelRequest) (*pb.CancelResponse, error) {
return s.cancelController.Cancel(ctx, req)
}

func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "Gateway server failure: %v\n", err)
Expand Down Expand Up @@ -137,9 +143,12 @@ func run() error {
// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
landController := controller.NewLandController(logger.Sugar(), scope, cnt, mysqlQueue.Publisher(), "request")
cancelController := controller.NewCancelController(logger.Sugar(), scope, mysqlQueue.Publisher(), "cancel")

gatewayServer := &GatewayServer{
pingController: pingController,
landController: landController,
pingController: pingController,
landController: landController,
cancelController: cancelController,
}

pb.RegisterSubmitQueueGatewayServer(grpcServer, gatewayServer)
Expand Down
3 changes: 3 additions & 0 deletions gateway/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "controller",
srcs = [
"cancel.go",
"land.go",
"ping.go",
],
importpath = "github.com/uber/submitqueue/gateway/controller",
visibility = ["//visibility:public"],
deps = [
"//core/errs",
"//core/metrics",
"//entity",
"//entity/queue",
"//extension/counter",
Expand All @@ -23,6 +25,7 @@ go_library(
go_test(
name = "controller_test",
srcs = [
"cancel_test.go",
"land_test.go",
"ping_test.go",
],
Expand Down
99 changes: 99 additions & 0 deletions gateway/controller/cancel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package controller

import (
"context"
"fmt"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/entity/queue"
extqueue "github.com/uber/submitqueue/extension/queue"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/zap"
)

// CancelController handles cancel business logic for the gateway
type CancelController struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
publisher extqueue.Publisher
topic string // Topic to publish cancel requests to (e.g., "cancel")
}

// NewCancelController creates a new instance of the gateway cancel controller.
// topic: the queue topic to publish cancel requests to (e.g., "cancel")
func NewCancelController(logger *zap.SugaredLogger, scope tally.Scope, publisher extqueue.Publisher, topic string) *CancelController {
return &CancelController{
logger: logger,
metricsScope: scope,
publisher: publisher,
topic: topic,
}
}

// Cancel handles the cancel request and returns a response.
// It publishes the sqid to the cancel topic for async processing.
// No validation is performed — the orchestrator handles that.
func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (resp *pb.CancelResponse, retErr error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to future us (non blocking): we should discuss if controllers should receive proto structures directly. It seems that the final surfacing of the error should be a responsibility of a service, not a controller.

op := metrics.Begin(c.metricsScope, "cancel")
defer func() { op.Complete(retErr) }()

sqid := req.Sqid
Comment thread
manjari25 marked this conversation as resolved.

if sqid == "" {
return &pb.CancelResponse{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be INVALID_ARGUMENT + BadMessage first class GRPC error
errdetails.BadRequest
code.InvalidArgument

import (
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/genproto/googleapis/rpc/errdetails"
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transformation from Go errors to GRPC errors is better to be done at a certain distinct function, means Cancel() function likely wants to wrap another cancelRequest(...) error function which is to return first class validationError.

Error: &pb.Error{Message: "sqid is required"},
}, nil
}

// TODO -
// Look up event store to see if sqid exists -
// - if found and request is already in a terminal state; return cancellation failed with appropriate
// error message.
// - if found and request is not in a terminal state; publish a cancel entity onto the cancel topic
// to be picked up by the orchestrator.
// - if not found; publish a cancel entity onto the cancel topic
// to be picked up by the orchestrator.
cancel := entity.Cancel{
Sqid: sqid,
}

if err := c.publishToQueue(ctx, cancel); err != nil {
c.logger.Errorw("failed to publish cancel request to queue",
"sqid", sqid,
"error", err,
)
return nil, fmt.Errorf("CancelController failed to publish cancel request to queue: %w", err)
}

c.logger.Infow("cancel request published to queue",
"sqid", sqid,
"topic", c.topic,
)

return &pb.CancelResponse{
Sqid: sqid,
CurrentStatus: pb.RequestStatus_CANCELLATION_ACCEPTED,
}, nil
}

// publishToQueue publishes a cancel entity to the cancel queue for async processing.
func (c *CancelController) publishToQueue(ctx context.Context, cancel entity.Cancel) error {
payload, err := cancel.ToBytes()
if err != nil {
return fmt.Errorf("failed to serialize cancel entity: %w", err)
}

// Create queue message
// - Message ID: sqid for idempotency
// - Payload: serialized Cancel entity
// - Partition key: empty (no inherent ordering for cancel requests)
msg := queue.NewMessage(cancel.Sqid, payload, "", nil)

if err := c.publisher.Publish(ctx, c.topic, msg); err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

return nil
}
100 changes: 100 additions & 0 deletions gateway/controller/cancel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package controller

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/entity/queue"
queuemock "github.com/uber/submitqueue/extension/queue/mock"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
)

func TestNewCancelController(t *testing.T) {
ctrl := gomock.NewController(t)
controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, noopPublisher(ctrl), "cancel")
require.NotNil(t, controller)
}

func TestCancel_ReturnsErrorOnEmptySqid(t *testing.T) {
ctrl := gomock.NewController(t)
controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, noopPublisher(ctrl), "cancel")
ctx := context.Background()

req := &pb.CancelRequest{Sqid: ""}
resp, err := controller.Cancel(ctx, req)

require.NoError(t, err)
assert.Empty(t, resp.Sqid)
assert.Equal(t, pb.RequestStatus_UNKNOWN, resp.CurrentStatus)
require.NotNil(t, resp.Error)
assert.Equal(t, "sqid is required", resp.Error.Message)
}

func TestCancel_ReturnsSqid(t *testing.T) {
ctrl := gomock.NewController(t)
controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, noopPublisher(ctrl), "cancel")
ctx := context.Background()

req := &pb.CancelRequest{Sqid: "test-queue/123"}
resp, err := controller.Cancel(ctx, req)

require.NoError(t, err)
assert.Equal(t, "test-queue/123", resp.Sqid)
assert.Equal(t, pb.RequestStatus_CANCELLATION_ACCEPTED, resp.CurrentStatus)
}

func TestCancel_PublishesToQueue(t *testing.T) {
var publishedTopic string
var publishedMessage queue.Message

ctrl := gomock.NewController(t)
publisher := queuemock.NewMockPublisher(ctrl)
publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, topic string, msg queue.Message) error {
publishedTopic = topic
publishedMessage = msg
return nil
},
)

controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, publisher, "cancel")
ctx := context.Background()

req := &pb.CancelRequest{Sqid: "my-queue/42"}
resp, err := controller.Cancel(ctx, req)

require.NoError(t, err)
assert.Equal(t, "my-queue/42", resp.Sqid)
assert.Equal(t, pb.RequestStatus_CANCELLATION_ACCEPTED, resp.CurrentStatus)

// Verify message was published to the cancel topic
assert.Equal(t, "cancel", publishedTopic)
assert.Equal(t, "my-queue/42", publishedMessage.ID)
assert.Equal(t, "my-queue/42", publishedMessage.PartitionKey)

// Verify payload is a serialized Cancel entity
cancel, err := entity.CancelFromBytes(publishedMessage.Payload)
require.NoError(t, err)
assert.Equal(t, "my-queue/42", cancel.Sqid)
}

func TestCancel_ReturnsErrorOnPublishFailure(t *testing.T) {
ctrl := gomock.NewController(t)
publisher := queuemock.NewMockPublisher(ctrl)
publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable"))

controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, publisher, "cancel")
ctx := context.Background()

req := &pb.CancelRequest{Sqid: "test-queue/999"}
_, err := controller.Cancel(ctx, req)

require.Error(t, err)
}
Loading