From 1f0f64fbfc17240032d79aa09823c71110906b3a Mon Sep 17 00:00:00 2001 From: manjari Date: Tue, 3 Mar 2026 20:35:34 +0000 Subject: [PATCH 1/4] feat(controller) Add and scaffold Cancel API in gateway --- core/consumer/registry.go | 2 + example/server/gateway/main.go | 17 +++- gateway/controller/BUILD.bazel | 2 + gateway/controller/cancel.go | 81 +++++++++++++++++ gateway/controller/cancel_test.go | 78 ++++++++++++++++ gateway/proto/gateway.proto | 16 ++++ gateway/protopb/gateway.pb.go | 135 ++++++++++++++++++++++++---- gateway/protopb/gateway.pb.yarpc.go | 121 ++++++++++++++++++------- gateway/protopb/gateway_grpc.pb.go | 46 +++++++++- 9 files changed, 441 insertions(+), 57 deletions(-) create mode 100644 gateway/controller/cancel.go create mode 100644 gateway/controller/cancel_test.go diff --git a/core/consumer/registry.go b/core/consumer/registry.go index 945d9ade..10de82f9 100644 --- a/core/consumer/registry.go +++ b/core/consumer/registry.go @@ -32,6 +32,8 @@ const ( TopicKeyMerge TopicKey = "merge" // TopicKeyConclude is the pipeline stage where merged requests are published for conclusion. TopicKeyConclude TopicKey = "conclude" + // TopicKeyCancel is the pipeline stage where cancellation requests are published for processing. + TopicKeyCancel TopicKey = "cancel" ) // String returns the topic key as a string. diff --git a/example/server/gateway/main.go b/example/server/gateway/main.go index 48eec7ba..9472aea5 100644 --- a/example/server/gateway/main.go +++ b/example/server/gateway/main.go @@ -25,8 +25,9 @@ import ( // GatewayServer wraps the controller and implements the gRPC service interface type GatewayServer struct { pb.UnimplementedSubmitQueueGatewayServer - pingController *controller.PingController - landController *controller.LandController + pingController *controller.PingController + landController *controller.LandController + cancelController *controller.CancelController } // Ping delegates to the controller @@ -39,6 +40,11 @@ func (s *GatewayServer) Land(ctx context.Context, req *pb.LandRequest) (*pb.Land return s.landController.Land(ctx, req) } +// Cancel delegates to the controller +func (s *GatewayServer) Cancel(ctx context.Context, req *pb.CancelRequest) (*pb.CancelResponse, error) { + return s.cancelController.Cancel(ctx, req) +} + func main() { if err := run(); err != nil { fmt.Fprintf(os.Stderr, "Gateway server failure: %v\n", err) @@ -137,9 +143,12 @@ func run() error { // Create controllers and wrap them for gRPC pingController := controller.NewPingController(logger, scope) landController := controller.NewLandController(logger.Sugar(), scope, cnt, mysqlQueue.Publisher(), "request") + cancelController := controller.NewCancelController(logger.Sugar(), scope, mysqlQueue.Publisher(), "cancel") + gatewayServer := &GatewayServer{ - pingController: pingController, - landController: landController, + pingController: pingController, + landController: landController, + cancelController: cancelController, } pb.RegisterSubmitQueueGatewayServer(grpcServer, gatewayServer) diff --git a/gateway/controller/BUILD.bazel b/gateway/controller/BUILD.bazel index 5329be0e..e9d0333a 100644 --- a/gateway/controller/BUILD.bazel +++ b/gateway/controller/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "controller", srcs = [ + "cancel.go", "land.go", "ping.go", ], @@ -23,6 +24,7 @@ go_library( go_test( name = "controller_test", srcs = [ + "cancel_test.go", "land_test.go", "ping_test.go", ], diff --git a/gateway/controller/cancel.go b/gateway/controller/cancel.go new file mode 100644 index 00000000..545d84de --- /dev/null +++ b/gateway/controller/cancel.go @@ -0,0 +1,81 @@ +package controller + +import ( + "context" + "fmt" + "time" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/entity/queue" + extqueue "github.com/uber/submitqueue/extension/queue" + pb "github.com/uber/submitqueue/gateway/protopb" + "go.uber.org/zap" +) + +// CancelController handles cancel business logic for the gateway +type CancelController struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + publisher extqueue.Publisher + topic string // Topic to publish cancel requests to (e.g., "cancel") +} + +// NewCancelController creates a new instance of the gateway cancel controller. +// topic: the queue topic to publish cancel requests to (e.g., "cancel") +func NewCancelController(logger *zap.SugaredLogger, scope tally.Scope, publisher extqueue.Publisher, topic string) *CancelController { + return &CancelController{ + logger: logger, + metricsScope: scope, + publisher: publisher, + topic: topic, + } +} + +// Cancel handles the cancel request and returns a response. +// It publishes the sqid to the cancel topic for async processing. +// No validation is performed — the orchestrator handles that. +func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (*pb.CancelResponse, error) { + start := time.Now() + defer func() { + c.metricsScope.Timer("cancel_request_latency").Record(time.Since(start)) + }() + + c.metricsScope.Counter("cancel_request_count").Inc(1) + + sqid := req.Sqid + + if err := c.publishToQueue(ctx, sqid); err != nil { + c.logger.Errorw("failed to publish cancel request to queue", + "sqid", sqid, + "error", err, + ) + return nil, fmt.Errorf("CancelController failed to publish cancel request to queue: %w", err) + } + + c.logger.Infow("cancel request published to queue", + "sqid", sqid, + "topic", c.topic, + ) + c.metricsScope.Counter("cancel_publish_success").Inc(1) + + return &pb.CancelResponse{ + Sqid: sqid, + }, nil +} + +// publishToQueue publishes a cancel request to the cancel queue for async processing. +func (c *CancelController) publishToQueue(ctx context.Context, sqid string) error { + payload := []byte(sqid) + + // Create queue message + // - Message ID: sqid for idempotency + // - Payload: sqid as bytes + // - Partition key: sqid (ensures ordering per request) + msg := queue.NewMessage(sqid, payload, sqid, nil) + + if err := c.publisher.Publish(ctx, c.topic, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} diff --git a/gateway/controller/cancel_test.go b/gateway/controller/cancel_test.go new file mode 100644 index 00000000..17930688 --- /dev/null +++ b/gateway/controller/cancel_test.go @@ -0,0 +1,78 @@ +package controller + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/entity/queue" + queuemock "github.com/uber/submitqueue/extension/queue/mock" + pb "github.com/uber/submitqueue/gateway/protopb" + "go.uber.org/mock/gomock" + "go.uber.org/zap" +) + +func TestNewCancelController(t *testing.T) { + ctrl := gomock.NewController(t) + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, noopPublisher(ctrl), "cancel") + require.NotNil(t, controller) +} + +func TestCancel_ReturnsSqid(t *testing.T) { + ctrl := gomock.NewController(t) + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, noopPublisher(ctrl), "cancel") + ctx := context.Background() + + req := &pb.CancelRequest{Sqid: "test-queue/123"} + resp, err := controller.Cancel(ctx, req) + + require.NoError(t, err) + assert.Equal(t, "test-queue/123", resp.Sqid) +} + +func TestCancel_PublishesToQueue(t *testing.T) { + var publishedTopic string + var publishedMessage queue.Message + + ctrl := gomock.NewController(t) + publisher := queuemock.NewMockPublisher(ctrl) + publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg queue.Message) error { + publishedTopic = topic + publishedMessage = msg + return nil + }, + ) + + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, publisher, "cancel") + ctx := context.Background() + + req := &pb.CancelRequest{Sqid: "my-queue/42"} + resp, err := controller.Cancel(ctx, req) + + require.NoError(t, err) + assert.Equal(t, "my-queue/42", resp.Sqid) + + // Verify message was published to the cancel topic + assert.Equal(t, "cancel", publishedTopic) + assert.Equal(t, "my-queue/42", publishedMessage.ID) + assert.Equal(t, "my-queue/42", publishedMessage.PartitionKey) + assert.Equal(t, []byte("my-queue/42"), publishedMessage.Payload) +} + +func TestCancel_ReturnsErrorOnPublishFailure(t *testing.T) { + ctrl := gomock.NewController(t) + publisher := queuemock.NewMockPublisher(ctrl) + publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable")) + + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, publisher, "cancel") + ctx := context.Background() + + req := &pb.CancelRequest{Sqid: "test-queue/999"} + _, err := controller.Cancel(ctx, req) + + require.Error(t, err) +} diff --git a/gateway/proto/gateway.proto b/gateway/proto/gateway.proto index e1495193..409a553e 100644 --- a/gateway/proto/gateway.proto +++ b/gateway/proto/gateway.proto @@ -73,6 +73,18 @@ message LandResponse { string sqid = 1; } +// CancelRequest defines a request to cancel a previously submitted land request. +message CancelRequest { + // Globally unique identifier for the land request to cancel, as returned by Land. + string sqid = 1; +} + +// CancelResponse defines the response to a cancel request. +message CancelResponse { + // Globally unique identifier for the land request that was cancelled. + string sqid = 1; +} + // *************** // Error messages, returned as `google.rpc.Status` messages. // *************** @@ -103,4 +115,8 @@ service SubmitQueueGateway { // Land lands a set of code changes into a target branch, performing the necessary validations across all other changes in the queue. // The processing is asynchronous and returns a LandResponse immediately. The land request is processed in the background. rpc Land(LandRequest) returns (LandResponse) {} + + // Cancel cancels a previously submitted land request. The cancellation is asynchronous and + // published to a queue for processing. No validation is performed on whether the request exists. + rpc Cancel(CancelRequest) returns (CancelResponse) {} } diff --git a/gateway/protopb/gateway.pb.go b/gateway/protopb/gateway.pb.go index cfc144d5..571e4298 100644 --- a/gateway/protopb/gateway.pb.go +++ b/gateway/protopb/gateway.pb.go @@ -364,6 +364,98 @@ func (x *LandResponse) GetSqid() string { return "" } +// CancelRequest defines a request to cancel a previously submitted land request. +type CancelRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Globally unique identifier for the land request to cancel, as returned by Land. + Sqid string `protobuf:"bytes,1,opt,name=sqid,proto3" json:"sqid,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CancelRequest) Reset() { + *x = CancelRequest{} + mi := &file_gateway_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CancelRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CancelRequest) ProtoMessage() {} + +func (x *CancelRequest) ProtoReflect() protoreflect.Message { + mi := &file_gateway_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CancelRequest.ProtoReflect.Descriptor instead. +func (*CancelRequest) Descriptor() ([]byte, []int) { + return file_gateway_proto_rawDescGZIP(), []int{5} +} + +func (x *CancelRequest) GetSqid() string { + if x != nil { + return x.Sqid + } + return "" +} + +// CancelResponse defines the response to a cancel request. +type CancelResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Globally unique identifier for the land request that was cancelled. + Sqid string `protobuf:"bytes,1,opt,name=sqid,proto3" json:"sqid,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CancelResponse) Reset() { + *x = CancelResponse{} + mi := &file_gateway_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CancelResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CancelResponse) ProtoMessage() {} + +func (x *CancelResponse) ProtoReflect() protoreflect.Message { + mi := &file_gateway_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CancelResponse.ProtoReflect.Descriptor instead. +func (*CancelResponse) Descriptor() ([]byte, []int) { + return file_gateway_proto_rawDescGZIP(), []int{6} +} + +func (x *CancelResponse) GetSqid() string { + if x != nil { + return x.Sqid + } + return "" +} + // Generic error with metadata. Each custom error type should extend this message. type Error struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -375,7 +467,7 @@ type Error struct { func (x *Error) Reset() { *x = Error{} - mi := &file_gateway_proto_msgTypes[5] + mi := &file_gateway_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -387,7 +479,7 @@ func (x *Error) String() string { func (*Error) ProtoMessage() {} func (x *Error) ProtoReflect() protoreflect.Message { - mi := &file_gateway_proto_msgTypes[5] + mi := &file_gateway_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -400,7 +492,7 @@ func (x *Error) ProtoReflect() protoreflect.Message { // Deprecated: Use Error.ProtoReflect.Descriptor instead. func (*Error) Descriptor() ([]byte, []int) { - return file_gateway_proto_rawDescGZIP(), []int{5} + return file_gateway_proto_rawDescGZIP(), []int{7} } func (x *Error) GetMessage() string { @@ -423,7 +515,7 @@ type UnrecognizedQueueError struct { func (x *UnrecognizedQueueError) Reset() { *x = UnrecognizedQueueError{} - mi := &file_gateway_proto_msgTypes[6] + mi := &file_gateway_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -435,7 +527,7 @@ func (x *UnrecognizedQueueError) String() string { func (*UnrecognizedQueueError) ProtoMessage() {} func (x *UnrecognizedQueueError) ProtoReflect() protoreflect.Message { - mi := &file_gateway_proto_msgTypes[6] + mi := &file_gateway_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -448,7 +540,7 @@ func (x *UnrecognizedQueueError) ProtoReflect() protoreflect.Message { // Deprecated: Use UnrecognizedQueueError.ProtoReflect.Descriptor instead. func (*UnrecognizedQueueError) Descriptor() ([]byte, []int) { - return file_gateway_proto_rawDescGZIP(), []int{6} + return file_gateway_proto_rawDescGZIP(), []int{8} } func (x *UnrecognizedQueueError) GetError() *Error { @@ -484,6 +576,10 @@ const file_gateway_proto_rawDesc = "" + "\x06change\x18\x02 \x01(\v2'.uber.devexp.submitqueue.gateway.ChangeR\x06change\x12E\n" + "\bstrategy\x18\x04 \x01(\x0e2).uber.devexp.submitqueue.gateway.StrategyR\bstrategy\"\"\n" + "\fLandResponse\x12\x12\n" + + "\x04sqid\x18\x01 \x01(\tR\x04sqid\"#\n" + + "\rCancelRequest\x12\x12\n" + + "\x04sqid\x18\x01 \x01(\tR\x04sqid\"$\n" + + "\x0eCancelResponse\x12\x12\n" + "\x04sqid\x18\x01 \x01(\tR\x04sqid\"!\n" + "\x05Error\x12\x18\n" + "\amessage\x18\x01 \x01(\tR\amessage\"l\n" + @@ -495,10 +591,11 @@ const file_gateway_proto_rawDesc = "" + "\n" + "\x06REBASE\x10\x01\x12\x11\n" + "\rSQUASH_REBASE\x10\x02\x12\t\n" + - "\x05MERGE\x10\x032\xe2\x01\n" + + "\x05MERGE\x10\x032\xcf\x02\n" + "\x12SubmitQueueGateway\x12e\n" + "\x04Ping\x12,.uber.devexp.submitqueue.gateway.PingRequest\x1a-.uber.devexp.submitqueue.gateway.PingResponse\"\x00\x12e\n" + - "\x04Land\x12,.uber.devexp.submitqueue.gateway.LandRequest\x1a-.uber.devexp.submitqueue.gateway.LandResponse\"\x00Bb\n" + + "\x04Land\x12,.uber.devexp.submitqueue.gateway.LandRequest\x1a-.uber.devexp.submitqueue.gateway.LandResponse\"\x00\x12k\n" + + "\x06Cancel\x12..uber.devexp.submitqueue.gateway.CancelRequest\x1a/.uber.devexp.submitqueue.gateway.CancelResponse\"\x00Bb\n" + "#com.uber.devexp.submitqueue.gatewayB\fGatewayProtoP\x01Z+github.com/uber/submitqueue/gateway/protopbb\x06proto3" var ( @@ -514,7 +611,7 @@ func file_gateway_proto_rawDescGZIP() []byte { } var file_gateway_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_gateway_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_gateway_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_gateway_proto_goTypes = []any{ (Strategy)(0), // 0: uber.devexp.submitqueue.gateway.Strategy (*PingRequest)(nil), // 1: uber.devexp.submitqueue.gateway.PingRequest @@ -522,19 +619,23 @@ var file_gateway_proto_goTypes = []any{ (*Change)(nil), // 3: uber.devexp.submitqueue.gateway.Change (*LandRequest)(nil), // 4: uber.devexp.submitqueue.gateway.LandRequest (*LandResponse)(nil), // 5: uber.devexp.submitqueue.gateway.LandResponse - (*Error)(nil), // 6: uber.devexp.submitqueue.gateway.Error - (*UnrecognizedQueueError)(nil), // 7: uber.devexp.submitqueue.gateway.UnrecognizedQueueError + (*CancelRequest)(nil), // 6: uber.devexp.submitqueue.gateway.CancelRequest + (*CancelResponse)(nil), // 7: uber.devexp.submitqueue.gateway.CancelResponse + (*Error)(nil), // 8: uber.devexp.submitqueue.gateway.Error + (*UnrecognizedQueueError)(nil), // 9: uber.devexp.submitqueue.gateway.UnrecognizedQueueError } var file_gateway_proto_depIdxs = []int32{ 3, // 0: uber.devexp.submitqueue.gateway.LandRequest.change:type_name -> uber.devexp.submitqueue.gateway.Change 0, // 1: uber.devexp.submitqueue.gateway.LandRequest.strategy:type_name -> uber.devexp.submitqueue.gateway.Strategy - 6, // 2: uber.devexp.submitqueue.gateway.UnrecognizedQueueError.error:type_name -> uber.devexp.submitqueue.gateway.Error + 8, // 2: uber.devexp.submitqueue.gateway.UnrecognizedQueueError.error:type_name -> uber.devexp.submitqueue.gateway.Error 1, // 3: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Ping:input_type -> uber.devexp.submitqueue.gateway.PingRequest 4, // 4: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Land:input_type -> uber.devexp.submitqueue.gateway.LandRequest - 2, // 5: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Ping:output_type -> uber.devexp.submitqueue.gateway.PingResponse - 5, // 6: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Land:output_type -> uber.devexp.submitqueue.gateway.LandResponse - 5, // [5:7] is the sub-list for method output_type - 3, // [3:5] is the sub-list for method input_type + 6, // 5: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Cancel:input_type -> uber.devexp.submitqueue.gateway.CancelRequest + 2, // 6: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Ping:output_type -> uber.devexp.submitqueue.gateway.PingResponse + 5, // 7: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Land:output_type -> uber.devexp.submitqueue.gateway.LandResponse + 7, // 8: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Cancel:output_type -> uber.devexp.submitqueue.gateway.CancelResponse + 6, // [6:9] is the sub-list for method output_type + 3, // [3:6] is the sub-list for method input_type 3, // [3:3] is the sub-list for extension type_name 3, // [3:3] is the sub-list for extension extendee 0, // [0:3] is the sub-list for field type_name @@ -551,7 +652,7 @@ func file_gateway_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_gateway_proto_rawDesc), len(file_gateway_proto_rawDesc)), NumEnums: 1, - NumMessages: 7, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, diff --git a/gateway/protopb/gateway.pb.yarpc.go b/gateway/protopb/gateway.pb.yarpc.go index 17760cd6..ca8b152c 100644 --- a/gateway/protopb/gateway.pb.yarpc.go +++ b/gateway/protopb/gateway.pb.yarpc.go @@ -24,6 +24,7 @@ var _ = ioutil.NopCloser type SubmitQueueGatewayYARPCClient interface { Ping(context.Context, *PingRequest, ...yarpc.CallOption) (*PingResponse, error) Land(context.Context, *LandRequest, ...yarpc.CallOption) (*LandResponse, error) + Cancel(context.Context, *CancelRequest, ...yarpc.CallOption) (*CancelResponse, error) } func newSubmitQueueGatewayYARPCClient(clientConfig transport.ClientConfig, anyResolver jsonpb.AnyResolver, options ...protobuf.ClientOption) SubmitQueueGatewayYARPCClient { @@ -46,6 +47,7 @@ func NewSubmitQueueGatewayYARPCClient(clientConfig transport.ClientConfig, optio type SubmitQueueGatewayYARPCServer interface { Ping(context.Context, *PingRequest) (*PingResponse, error) Land(context.Context, *LandRequest) (*LandResponse, error) + Cancel(context.Context, *CancelRequest) (*CancelResponse, error) } type buildSubmitQueueGatewayYARPCProceduresParams struct { @@ -79,6 +81,16 @@ func buildSubmitQueueGatewayYARPCProcedures(params buildSubmitQueueGatewayYARPCP }, ), }, + { + MethodName: "Cancel", + Handler: protobuf.NewUnaryHandler( + protobuf.UnaryHandlerParams{ + Handle: handler.Cancel, + NewRequest: newSubmitQueueGatewayServiceCancelYARPCRequest, + AnyResolver: params.AnyResolver, + }, + ), + }, }, OnewayHandlerParams: []protobuf.BuildProceduresOnewayHandlerParams{}, StreamHandlerParams: []protobuf.BuildProceduresStreamHandlerParams{}, @@ -221,6 +233,18 @@ func (c *_SubmitQueueGatewayYARPCCaller) Land(ctx context.Context, request *Land return response, err } +func (c *_SubmitQueueGatewayYARPCCaller) Cancel(ctx context.Context, request *CancelRequest, options ...yarpc.CallOption) (*CancelResponse, error) { + responseMessage, err := c.streamClient.Call(ctx, "Cancel", request, newSubmitQueueGatewayServiceCancelYARPCResponse, options...) + if responseMessage == nil { + return nil, err + } + response, ok := responseMessage.(*CancelResponse) + if !ok { + return nil, protobuf.CastError(emptySubmitQueueGatewayServiceCancelYARPCResponse, responseMessage) + } + return response, err +} + type _SubmitQueueGatewayYARPCHandler struct { server SubmitQueueGatewayYARPCServer } @@ -257,6 +281,22 @@ func (h *_SubmitQueueGatewayYARPCHandler) Land(ctx context.Context, requestMessa return response, err } +func (h *_SubmitQueueGatewayYARPCHandler) Cancel(ctx context.Context, requestMessage proto.Message) (proto.Message, error) { + var request *CancelRequest + var ok bool + if requestMessage != nil { + request, ok = requestMessage.(*CancelRequest) + if !ok { + return nil, protobuf.CastError(emptySubmitQueueGatewayServiceCancelYARPCRequest, requestMessage) + } + } + response, err := h.server.Cancel(ctx, request) + if response == nil { + return nil, err + } + return response, err +} + func newSubmitQueueGatewayServicePingYARPCRequest() proto.Message { return &PingRequest{} } @@ -273,46 +313,59 @@ func newSubmitQueueGatewayServiceLandYARPCResponse() proto.Message { return &LandResponse{} } +func newSubmitQueueGatewayServiceCancelYARPCRequest() proto.Message { + return &CancelRequest{} +} + +func newSubmitQueueGatewayServiceCancelYARPCResponse() proto.Message { + return &CancelResponse{} +} + var ( - emptySubmitQueueGatewayServicePingYARPCRequest = &PingRequest{} - emptySubmitQueueGatewayServicePingYARPCResponse = &PingResponse{} - emptySubmitQueueGatewayServiceLandYARPCRequest = &LandRequest{} - emptySubmitQueueGatewayServiceLandYARPCResponse = &LandResponse{} + emptySubmitQueueGatewayServicePingYARPCRequest = &PingRequest{} + emptySubmitQueueGatewayServicePingYARPCResponse = &PingResponse{} + emptySubmitQueueGatewayServiceLandYARPCRequest = &LandRequest{} + emptySubmitQueueGatewayServiceLandYARPCResponse = &LandResponse{} + emptySubmitQueueGatewayServiceCancelYARPCRequest = &CancelRequest{} + emptySubmitQueueGatewayServiceCancelYARPCResponse = &CancelResponse{} ) var yarpcFileDescriptorClosuref1a937782ebbded5 = [][]byte{ // gateway.proto []byte{ - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0x5d, 0x6f, 0xd3, 0x3e, - 0x14, 0xc6, 0x9b, 0xbe, 0xad, 0x3d, 0xe9, 0xfe, 0xea, 0xdf, 0x42, 0x28, 0x9a, 0x26, 0xd1, 0x19, - 0x89, 0x95, 0xb7, 0x54, 0x2a, 0xb7, 0x48, 0xa8, 0x85, 0x30, 0x2e, 0x06, 0xea, 0x1c, 0x7a, 0xc3, - 0xcd, 0xe4, 0xa4, 0x47, 0x69, 0x24, 0xf2, 0x52, 0xdb, 0x19, 0x8c, 0x7b, 0x3e, 0x0d, 0xdf, 0x8a, - 0x4f, 0x82, 0xe2, 0xb8, 0x5b, 0x6e, 0x20, 0xbd, 0x3b, 0x76, 0xce, 0x2f, 0xcf, 0x73, 0x1e, 0xdb, - 0x70, 0x1c, 0x71, 0x85, 0xdf, 0xf8, 0xad, 0x9b, 0x8b, 0x4c, 0x65, 0xe4, 0x51, 0x11, 0xa0, 0x70, - 0x37, 0x78, 0x83, 0xdf, 0x73, 0x57, 0x16, 0x41, 0x12, 0xab, 0x5d, 0x81, 0x05, 0xba, 0xa6, 0x8d, - 0x9e, 0x83, 0xbd, 0x8a, 0xd3, 0x88, 0xe1, 0xae, 0x40, 0xa9, 0x88, 0x03, 0x47, 0x09, 0x4a, 0xc9, - 0x23, 0x74, 0xac, 0x89, 0x35, 0x1d, 0xb2, 0xfd, 0x92, 0xfe, 0xb4, 0x60, 0x54, 0x75, 0xca, 0x3c, - 0x4b, 0x25, 0xfe, 0xbd, 0x95, 0x9c, 0xc1, 0x48, 0xa2, 0xb8, 0x89, 0x43, 0xbc, 0x4e, 0x79, 0x82, - 0x4e, 0x5b, 0x7f, 0xb6, 0xcd, 0xde, 0x27, 0x9e, 0x20, 0x39, 0x85, 0xa1, 0x8a, 0x13, 0x94, 0x8a, - 0x27, 0xb9, 0xd3, 0x99, 0x58, 0xd3, 0x0e, 0xbb, 0xdf, 0x20, 0x27, 0x30, 0xd8, 0x66, 0x52, 0x69, - 0xb8, 0xab, 0xe1, 0xbb, 0x35, 0x3d, 0x85, 0xfe, 0xdb, 0x2d, 0x4f, 0x23, 0x24, 0x04, 0xba, 0x85, - 0x88, 0xa5, 0x63, 0x4d, 0x3a, 0xd3, 0x21, 0xd3, 0x35, 0xfd, 0x65, 0x81, 0x7d, 0xc9, 0xd3, 0xcd, - 0x7e, 0x9e, 0x07, 0xd0, 0xd3, 0xf3, 0x1a, 0x8b, 0xd5, 0x82, 0xbc, 0x81, 0x7e, 0xa8, 0xff, 0xa1, - 0xad, 0xd9, 0xf3, 0x73, 0xb7, 0x21, 0x26, 0xb7, 0x92, 0x64, 0x06, 0x23, 0x1e, 0x0c, 0xa4, 0x12, - 0x5c, 0x61, 0x74, 0xab, 0x0d, 0xfe, 0x37, 0x7f, 0xda, 0xf8, 0x0b, 0xdf, 0x00, 0xec, 0x0e, 0xa5, - 0x14, 0x46, 0x95, 0x59, 0x13, 0x29, 0x81, 0xae, 0xdc, 0xc5, 0x1b, 0x63, 0x56, 0xd7, 0xf4, 0x0c, - 0x7a, 0x9e, 0x10, 0x99, 0xf8, 0xc7, 0xd1, 0x7c, 0x85, 0x87, 0xeb, 0x54, 0x60, 0x98, 0x45, 0x69, - 0xfc, 0x03, 0x37, 0x57, 0xa5, 0x6c, 0xc5, 0xbc, 0x86, 0x1e, 0x96, 0x85, 0x26, 0xec, 0xf9, 0x93, - 0x46, 0x93, 0x1a, 0x63, 0x15, 0x74, 0x1f, 0x5e, 0xbb, 0x16, 0xde, 0xb3, 0x05, 0x0c, 0xf6, 0xa3, - 0x10, 0x1b, 0x8e, 0xde, 0x79, 0xef, 0x17, 0xeb, 0xcb, 0xcf, 0xe3, 0x16, 0x01, 0xe8, 0x33, 0x6f, - 0xb9, 0xf0, 0xbd, 0xb1, 0x45, 0xfe, 0x87, 0x63, 0xff, 0x6a, 0xbd, 0xf0, 0x3f, 0x5c, 0x9b, 0xad, - 0x36, 0x19, 0x42, 0xef, 0xa3, 0xc7, 0x2e, 0xbc, 0x71, 0x67, 0xfe, 0xdb, 0x02, 0xe2, 0x6b, 0x75, - 0xed, 0xf5, 0xa2, 0x12, 0x27, 0x08, 0xdd, 0xf2, 0x86, 0x91, 0x17, 0x8d, 0x36, 0x6b, 0x57, 0xf6, - 0xe4, 0xe5, 0x81, 0xdd, 0x55, 0xc6, 0xb4, 0x55, 0xca, 0x94, 0xa9, 0x1f, 0x20, 0x53, 0xbb, 0x49, - 0x07, 0xc8, 0xd4, 0x8f, 0x92, 0xb6, 0x96, 0x01, 0x3c, 0x0e, 0xb3, 0xa4, 0x89, 0x5a, 0x8e, 0xcc, - 0xf4, 0xab, 0xf2, 0xbd, 0xae, 0xac, 0x2f, 0xcf, 0xa3, 0x58, 0x6d, 0x8b, 0xc0, 0x0d, 0xb3, 0x64, - 0x56, 0xb2, 0xb3, 0x1a, 0x34, 0x33, 0xd0, 0x4c, 0x3f, 0xee, 0x3c, 0x08, 0xfa, 0xba, 0x78, 0xf5, - 0x27, 0x00, 0x00, 0xff, 0xff, 0xe8, 0x4d, 0xf1, 0x97, 0xf6, 0x03, 0x00, 0x00, + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x94, 0xcb, 0x6e, 0xd3, 0x4c, + 0x14, 0xc7, 0xe3, 0x5c, 0xdc, 0xe4, 0x38, 0xa9, 0xf2, 0x8d, 0x3e, 0x21, 0x2b, 0xaa, 0x44, 0xea, + 0x22, 0x1a, 0x6e, 0xb6, 0x14, 0xb6, 0x48, 0x28, 0x29, 0xa6, 0x2c, 0x0a, 0x4a, 0x6d, 0xb2, 0x61, + 0x53, 0x8d, 0x9d, 0x23, 0xc7, 0xa2, 0xbe, 0xc4, 0x33, 0x2e, 0x94, 0x3d, 0x4f, 0xc3, 0xc3, 0xf0, + 0x4a, 0xc8, 0xe3, 0x71, 0xeb, 0x05, 0xd4, 0xd9, 0x9d, 0x19, 0x9f, 0xdf, 0xfc, 0xff, 0xe7, 0x22, + 0xc3, 0x28, 0xa0, 0x1c, 0xbf, 0xd1, 0x5b, 0x33, 0xcd, 0x12, 0x9e, 0x90, 0xc7, 0xb9, 0x87, 0x99, + 0xb9, 0xc1, 0x1b, 0xfc, 0x9e, 0x9a, 0x2c, 0xf7, 0xa2, 0x90, 0xef, 0x72, 0xcc, 0xd1, 0x94, 0x69, + 0xc6, 0x29, 0x68, 0xab, 0x30, 0x0e, 0x1c, 0xdc, 0xe5, 0xc8, 0x38, 0xd1, 0xe1, 0x20, 0x42, 0xc6, + 0x68, 0x80, 0xba, 0x32, 0x55, 0x66, 0x03, 0xa7, 0x3a, 0x1a, 0x3f, 0x15, 0x18, 0x96, 0x99, 0x2c, + 0x4d, 0x62, 0x86, 0xff, 0x4e, 0x25, 0xc7, 0x30, 0x64, 0x98, 0xdd, 0x84, 0x3e, 0x5e, 0xc5, 0x34, + 0x42, 0xbd, 0x2d, 0x3e, 0x6b, 0xf2, 0xee, 0x13, 0x8d, 0x90, 0x1c, 0xc1, 0x80, 0x87, 0x11, 0x32, + 0x4e, 0xa3, 0x54, 0xef, 0x4c, 0x95, 0x59, 0xc7, 0xb9, 0xbf, 0x20, 0x13, 0xe8, 0x6f, 0x13, 0xc6, + 0x05, 0xdc, 0x15, 0xf0, 0xdd, 0xd9, 0x38, 0x02, 0xf5, 0x6c, 0x4b, 0xe3, 0x00, 0x09, 0x81, 0x6e, + 0x9e, 0x85, 0x4c, 0x57, 0xa6, 0x9d, 0xd9, 0xc0, 0x11, 0xb1, 0xf1, 0x4b, 0x01, 0xed, 0x82, 0xc6, + 0x9b, 0xaa, 0x9e, 0xff, 0xa1, 0x27, 0xea, 0x95, 0x16, 0xcb, 0x03, 0x79, 0x0b, 0xaa, 0x2f, 0xde, + 0x10, 0xd6, 0xb4, 0xf9, 0xa9, 0xd9, 0xd0, 0x26, 0xb3, 0x94, 0x74, 0x24, 0x46, 0x6c, 0xe8, 0x33, + 0x9e, 0x51, 0x8e, 0xc1, 0xad, 0x30, 0x78, 0x38, 0x7f, 0xd6, 0xf8, 0x84, 0x2b, 0x01, 0xe7, 0x0e, + 0x35, 0x0c, 0x18, 0x96, 0x66, 0x65, 0x4b, 0x09, 0x74, 0xd9, 0x2e, 0xdc, 0x48, 0xb3, 0x22, 0x36, + 0x4e, 0x60, 0x74, 0x46, 0x63, 0x1f, 0xaf, 0xab, 0x92, 0xfe, 0x96, 0xf4, 0x04, 0x0e, 0xab, 0xa4, + 0x07, 0x9e, 0x3a, 0x86, 0x9e, 0x9d, 0x65, 0x49, 0xf6, 0xc0, 0x94, 0xaf, 0xe1, 0xd1, 0x3a, 0xce, + 0xd0, 0x4f, 0x82, 0x38, 0xfc, 0x81, 0x9b, 0xcb, 0xa2, 0x82, 0x92, 0x79, 0x03, 0x3d, 0x2c, 0x02, + 0x41, 0x68, 0xf3, 0xa7, 0x8d, 0xf5, 0x0a, 0xcc, 0x29, 0xa1, 0xfb, 0x39, 0xb4, 0x6b, 0x73, 0x78, + 0xbe, 0x80, 0x7e, 0xd5, 0x15, 0xa2, 0xc1, 0xc1, 0x3b, 0xfb, 0xfd, 0x62, 0x7d, 0xf1, 0x79, 0xdc, + 0x22, 0x00, 0xaa, 0x63, 0x2f, 0x17, 0xae, 0x3d, 0x56, 0xc8, 0x7f, 0x30, 0x72, 0x2f, 0xd7, 0x0b, + 0xf7, 0xc3, 0x95, 0xbc, 0x6a, 0x93, 0x01, 0xf4, 0x3e, 0xda, 0xce, 0xb9, 0x3d, 0xee, 0xcc, 0x7f, + 0xb7, 0x81, 0xb8, 0x42, 0x5d, 0x78, 0x3d, 0x2f, 0xc5, 0x09, 0x42, 0xb7, 0x58, 0x56, 0xf2, 0xb2, + 0xd1, 0x66, 0x6d, 0xfb, 0x27, 0xaf, 0xf6, 0xcc, 0x2e, 0x7b, 0x6c, 0xb4, 0x0a, 0x99, 0x62, 0x80, + 0x7b, 0xc8, 0xd4, 0x96, 0x72, 0x0f, 0x99, 0xfa, 0x56, 0x18, 0x2d, 0xf2, 0x15, 0xd4, 0x72, 0xbc, + 0xc4, 0x6c, 0xde, 0xd4, 0xfa, 0xb2, 0x4c, 0xac, 0xbd, 0xf3, 0x2b, 0xb1, 0xa5, 0x07, 0x27, 0x7e, + 0x12, 0x35, 0x71, 0xcb, 0xa1, 0x6c, 0xf5, 0xaa, 0xf8, 0xcf, 0xac, 0x94, 0x2f, 0x2f, 0x82, 0x90, + 0x6f, 0x73, 0xcf, 0xf4, 0x93, 0xc8, 0x2a, 0x58, 0xab, 0x06, 0x59, 0x12, 0xb2, 0xc4, 0x4f, 0x29, + 0xf5, 0x3c, 0x55, 0x04, 0xaf, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0xf4, 0xdb, 0xc5, 0xbd, 0xae, + 0x04, 0x00, 0x00, }, } diff --git a/gateway/protopb/gateway_grpc.pb.go b/gateway/protopb/gateway_grpc.pb.go index 04820f4d..695ba3bc 100644 --- a/gateway/protopb/gateway_grpc.pb.go +++ b/gateway/protopb/gateway_grpc.pb.go @@ -19,8 +19,9 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - SubmitQueueGateway_Ping_FullMethodName = "/uber.devexp.submitqueue.gateway.SubmitQueueGateway/Ping" - SubmitQueueGateway_Land_FullMethodName = "/uber.devexp.submitqueue.gateway.SubmitQueueGateway/Land" + SubmitQueueGateway_Ping_FullMethodName = "/uber.devexp.submitqueue.gateway.SubmitQueueGateway/Ping" + SubmitQueueGateway_Land_FullMethodName = "/uber.devexp.submitqueue.gateway.SubmitQueueGateway/Land" + SubmitQueueGateway_Cancel_FullMethodName = "/uber.devexp.submitqueue.gateway.SubmitQueueGateway/Cancel" ) // SubmitQueueGatewayClient is the client API for SubmitQueueGateway service. @@ -34,6 +35,9 @@ type SubmitQueueGatewayClient interface { // Land lands a set of code changes into a target branch, performing the necessary validations across all other changes in the queue. // The processing is asynchronous and returns a LandResponse immediately. The land request is processed in the background. Land(ctx context.Context, in *LandRequest, opts ...grpc.CallOption) (*LandResponse, error) + // Cancel cancels a previously submitted land request. The cancellation is asynchronous and + // published to a queue for processing. No validation is performed on whether the request exists. + Cancel(ctx context.Context, in *CancelRequest, opts ...grpc.CallOption) (*CancelResponse, error) } type submitQueueGatewayClient struct { @@ -64,6 +68,16 @@ func (c *submitQueueGatewayClient) Land(ctx context.Context, in *LandRequest, op return out, nil } +func (c *submitQueueGatewayClient) Cancel(ctx context.Context, in *CancelRequest, opts ...grpc.CallOption) (*CancelResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(CancelResponse) + err := c.cc.Invoke(ctx, SubmitQueueGateway_Cancel_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // SubmitQueueGatewayServer is the server API for SubmitQueueGateway service. // All implementations must embed UnimplementedSubmitQueueGatewayServer // for forward compatibility. @@ -75,6 +89,9 @@ type SubmitQueueGatewayServer interface { // Land lands a set of code changes into a target branch, performing the necessary validations across all other changes in the queue. // The processing is asynchronous and returns a LandResponse immediately. The land request is processed in the background. Land(context.Context, *LandRequest) (*LandResponse, error) + // Cancel cancels a previously submitted land request. The cancellation is asynchronous and + // published to a queue for processing. No validation is performed on whether the request exists. + Cancel(context.Context, *CancelRequest) (*CancelResponse, error) mustEmbedUnimplementedSubmitQueueGatewayServer() } @@ -91,6 +108,9 @@ func (UnimplementedSubmitQueueGatewayServer) Ping(context.Context, *PingRequest) func (UnimplementedSubmitQueueGatewayServer) Land(context.Context, *LandRequest) (*LandResponse, error) { return nil, status.Error(codes.Unimplemented, "method Land not implemented") } +func (UnimplementedSubmitQueueGatewayServer) Cancel(context.Context, *CancelRequest) (*CancelResponse, error) { + return nil, status.Error(codes.Unimplemented, "method Cancel not implemented") +} func (UnimplementedSubmitQueueGatewayServer) mustEmbedUnimplementedSubmitQueueGatewayServer() {} func (UnimplementedSubmitQueueGatewayServer) testEmbeddedByValue() {} @@ -148,6 +168,24 @@ func _SubmitQueueGateway_Land_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } +func _SubmitQueueGateway_Cancel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CancelRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SubmitQueueGatewayServer).Cancel(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: SubmitQueueGateway_Cancel_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SubmitQueueGatewayServer).Cancel(ctx, req.(*CancelRequest)) + } + return interceptor(ctx, in, info, handler) +} + // SubmitQueueGateway_ServiceDesc is the grpc.ServiceDesc for SubmitQueueGateway service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -163,6 +201,10 @@ var SubmitQueueGateway_ServiceDesc = grpc.ServiceDesc{ MethodName: "Land", Handler: _SubmitQueueGateway_Land_Handler, }, + { + MethodName: "Cancel", + Handler: _SubmitQueueGateway_Cancel_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "gateway.proto", From 165a2bf559e6d2c7c911e117659dc0ae9b190a36 Mon Sep 17 00:00:00 2001 From: manjari Date: Tue, 3 Mar 2026 20:37:43 +0000 Subject: [PATCH 2/4] udpate comment --- gateway/controller/cancel.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gateway/controller/cancel.go b/gateway/controller/cancel.go index 545d84de..0956a3fd 100644 --- a/gateway/controller/cancel.go +++ b/gateway/controller/cancel.go @@ -44,6 +44,8 @@ func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (* sqid := req.Sqid + // TODO: Insert the request to the event store + if err := c.publishToQueue(ctx, sqid); err != nil { c.logger.Errorw("failed to publish cancel request to queue", "sqid", sqid, From ff0ef2051f0e4b1b0a1de7541672154c467b4005 Mon Sep 17 00:00:00 2001 From: manjari Date: Wed, 4 Mar 2026 01:00:50 +0000 Subject: [PATCH 3/4] use core/metrics --- gateway/controller/BUILD.bazel | 1 + gateway/controller/cancel.go | 13 ++++--------- gateway/controller/land.go | 13 ++++--------- 3 files changed, 9 insertions(+), 18 deletions(-) diff --git a/gateway/controller/BUILD.bazel b/gateway/controller/BUILD.bazel index e9d0333a..e96c2a95 100644 --- a/gateway/controller/BUILD.bazel +++ b/gateway/controller/BUILD.bazel @@ -11,6 +11,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/errs", + "//core/metrics", "//entity", "//entity/queue", "//extension/counter", diff --git a/gateway/controller/cancel.go b/gateway/controller/cancel.go index 0956a3fd..ab372b11 100644 --- a/gateway/controller/cancel.go +++ b/gateway/controller/cancel.go @@ -3,9 +3,9 @@ package controller import ( "context" "fmt" - "time" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity/queue" extqueue "github.com/uber/submitqueue/extension/queue" pb "github.com/uber/submitqueue/gateway/protopb" @@ -34,13 +34,9 @@ func NewCancelController(logger *zap.SugaredLogger, scope tally.Scope, publisher // Cancel handles the cancel request and returns a response. // It publishes the sqid to the cancel topic for async processing. // No validation is performed — the orchestrator handles that. -func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (*pb.CancelResponse, error) { - start := time.Now() - defer func() { - c.metricsScope.Timer("cancel_request_latency").Record(time.Since(start)) - }() - - c.metricsScope.Counter("cancel_request_count").Inc(1) +func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (resp *pb.CancelResponse, retErr error) { + op := metrics.Begin(c.metricsScope, "cancel") + defer func() { op.Complete(retErr) }() sqid := req.Sqid @@ -58,7 +54,6 @@ func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (* "sqid", sqid, "topic", c.topic, ) - c.metricsScope.Counter("cancel_publish_success").Inc(1) return &pb.CancelResponse{ Sqid: sqid, diff --git a/gateway/controller/land.go b/gateway/controller/land.go index fe5d861d..1eb70060 100644 --- a/gateway/controller/land.go +++ b/gateway/controller/land.go @@ -4,10 +4,10 @@ import ( "context" "errors" "fmt" - "time" "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/counter" @@ -47,13 +47,9 @@ func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter cou } // Land handles the land request and returns a response -func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error) { - start := time.Now() - defer func() { - c.metricsScope.Timer("land_request_latency").Record(time.Since(start)) - }() - - c.metricsScope.Counter("land_request_count").Inc(1) +func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *pb.LandResponse, retErr error) { + op := metrics.Begin(c.metricsScope, "land") + defer func() { op.Complete(retErr) }() // Validate required fields. if req.Queue == "" { @@ -117,7 +113,6 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan "sqid", request.ID, "topic", c.topic, ) - c.metricsScope.Counter("publish_success").Inc(1) return &pb.LandResponse{ Sqid: request.ID, From 65738db1ec136eeefffea501204f00845261c247 Mon Sep 17 00:00:00 2001 From: manjari Date: Wed, 4 Mar 2026 18:28:00 +0000 Subject: [PATCH 4/4] address comments --- entity/BUILD.bazel | 2 + entity/cancel.go | 22 ++++ entity/cancel_test.go | 63 +++++++++++ gateway/controller/cancel.go | 39 +++++-- gateway/controller/cancel_test.go | 24 ++++- gateway/proto/gateway.proto | 32 +++++- gateway/protopb/gateway.pb.go | 158 +++++++++++++++++++++++----- gateway/protopb/gateway.pb.yarpc.go | 74 +++++++------ gateway/protopb/gateway_grpc.pb.go | 16 ++- 9 files changed, 352 insertions(+), 78 deletions(-) create mode 100644 entity/cancel.go create mode 100644 entity/cancel_test.go diff --git a/entity/BUILD.bazel b/entity/BUILD.bazel index ccd288d0..2bcf95d7 100644 --- a/entity/BUILD.bazel +++ b/entity/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "batch.go", "batch_dependent.go", "build.go", + "cancel.go", "change_provider.go", "queue_config.go", "request.go", @@ -20,6 +21,7 @@ go_test( srcs = [ "batch_test.go", "build_test.go", + "cancel_test.go", "request_test.go", ], embed = [":entity"], diff --git a/entity/cancel.go b/entity/cancel.go new file mode 100644 index 00000000..a024508d --- /dev/null +++ b/entity/cancel.go @@ -0,0 +1,22 @@ +package entity + +import "encoding/json" + +// Cancel represents a request to cancel a previously submitted land request. +// The object is immutable after creation. +type Cancel struct { + // Sqid is the globally unique identifier of the land request to cancel. + Sqid string `json:"sqid"` +} + +// ToBytes serializes the Cancel to JSON bytes for queue message payload. +func (c Cancel) ToBytes() ([]byte, error) { + return json.Marshal(c) +} + +// CancelFromBytes deserializes a Cancel from JSON bytes. +func CancelFromBytes(data []byte) (Cancel, error) { + var c Cancel + err := json.Unmarshal(data, &c) + return c, err +} diff --git a/entity/cancel_test.go b/entity/cancel_test.go new file mode 100644 index 00000000..d6756804 --- /dev/null +++ b/entity/cancel_test.go @@ -0,0 +1,63 @@ +package entity + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCancel_ToBytes(t *testing.T) { + cancel := Cancel{ + Sqid: "test-queue/123", + } + + data, err := cancel.ToBytes() + require.NoError(t, err) + assert.Contains(t, string(data), "test-queue/123") +} + +func TestCancelFromBytes(t *testing.T) { + original := Cancel{ + Sqid: "my-queue/999", + } + + data, err := original.ToBytes() + require.NoError(t, err) + + deserialized, err := CancelFromBytes(data) + require.NoError(t, err) + assert.Equal(t, original, deserialized) +} + +func TestCancelFromBytes_InvalidJSON(t *testing.T) { + _, err := CancelFromBytes([]byte(`{"invalid": json"}`)) + assert.Error(t, err) +} + +func TestCancel_SerializationRoundTrip(t *testing.T) { + tests := []struct { + name string + cancel Cancel + }{ + { + name: "standard sqid", + cancel: Cancel{Sqid: "queue1/100"}, + }, + { + name: "long sqid", + cancel: Cancel{Sqid: "my-long-queue-name/999999"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + data, err := tt.cancel.ToBytes() + require.NoError(t, err) + + deserialized, err := CancelFromBytes(data) + require.NoError(t, err) + assert.Equal(t, tt.cancel, deserialized) + }) + } +} diff --git a/gateway/controller/cancel.go b/gateway/controller/cancel.go index ab372b11..4f58029c 100644 --- a/gateway/controller/cancel.go +++ b/gateway/controller/cancel.go @@ -6,6 +6,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" extqueue "github.com/uber/submitqueue/extension/queue" pb "github.com/uber/submitqueue/gateway/protopb" @@ -40,9 +41,25 @@ func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (r sqid := req.Sqid - // TODO: Insert the request to the event store + if sqid == "" { + return &pb.CancelResponse{ + Error: &pb.Error{Message: "sqid is required"}, + }, nil + } + + // TODO - + // Look up event store to see if sqid exists - + // - if found and request is already in a terminal state; return cancellation failed with appropriate + // error message. + // - if found and request is not in a terminal state; publish a cancel entity onto the cancel topic + // to be picked up by the orchestrator. + // - if not found; publish a cancel entity onto the cancel topic + // to be picked up by the orchestrator. + cancel := entity.Cancel{ + Sqid: sqid, + } - if err := c.publishToQueue(ctx, sqid); err != nil { + if err := c.publishToQueue(ctx, cancel); err != nil { c.logger.Errorw("failed to publish cancel request to queue", "sqid", sqid, "error", err, @@ -56,19 +73,23 @@ func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (r ) return &pb.CancelResponse{ - Sqid: sqid, + Sqid: sqid, + CurrentStatus: pb.RequestStatus_CANCELLATION_ACCEPTED, }, nil } -// publishToQueue publishes a cancel request to the cancel queue for async processing. -func (c *CancelController) publishToQueue(ctx context.Context, sqid string) error { - payload := []byte(sqid) +// publishToQueue publishes a cancel entity to the cancel queue for async processing. +func (c *CancelController) publishToQueue(ctx context.Context, cancel entity.Cancel) error { + payload, err := cancel.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize cancel entity: %w", err) + } // Create queue message // - Message ID: sqid for idempotency - // - Payload: sqid as bytes - // - Partition key: sqid (ensures ordering per request) - msg := queue.NewMessage(sqid, payload, sqid, nil) + // - Payload: serialized Cancel entity + // - Partition key: empty (no inherent ordering for cancel requests) + msg := queue.NewMessage(cancel.Sqid, payload, "", nil) if err := c.publisher.Publish(ctx, c.topic, msg); err != nil { return fmt.Errorf("failed to publish message: %w", err) diff --git a/gateway/controller/cancel_test.go b/gateway/controller/cancel_test.go index 17930688..00496c24 100644 --- a/gateway/controller/cancel_test.go +++ b/gateway/controller/cancel_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" pb "github.com/uber/submitqueue/gateway/protopb" @@ -21,6 +22,21 @@ func TestNewCancelController(t *testing.T) { require.NotNil(t, controller) } +func TestCancel_ReturnsErrorOnEmptySqid(t *testing.T) { + ctrl := gomock.NewController(t) + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, noopPublisher(ctrl), "cancel") + ctx := context.Background() + + req := &pb.CancelRequest{Sqid: ""} + resp, err := controller.Cancel(ctx, req) + + require.NoError(t, err) + assert.Empty(t, resp.Sqid) + assert.Equal(t, pb.RequestStatus_UNKNOWN, resp.CurrentStatus) + require.NotNil(t, resp.Error) + assert.Equal(t, "sqid is required", resp.Error.Message) +} + func TestCancel_ReturnsSqid(t *testing.T) { ctrl := gomock.NewController(t) controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, noopPublisher(ctrl), "cancel") @@ -31,6 +47,7 @@ func TestCancel_ReturnsSqid(t *testing.T) { require.NoError(t, err) assert.Equal(t, "test-queue/123", resp.Sqid) + assert.Equal(t, pb.RequestStatus_CANCELLATION_ACCEPTED, resp.CurrentStatus) } func TestCancel_PublishesToQueue(t *testing.T) { @@ -55,12 +72,17 @@ func TestCancel_PublishesToQueue(t *testing.T) { require.NoError(t, err) assert.Equal(t, "my-queue/42", resp.Sqid) + assert.Equal(t, pb.RequestStatus_CANCELLATION_ACCEPTED, resp.CurrentStatus) // Verify message was published to the cancel topic assert.Equal(t, "cancel", publishedTopic) assert.Equal(t, "my-queue/42", publishedMessage.ID) assert.Equal(t, "my-queue/42", publishedMessage.PartitionKey) - assert.Equal(t, []byte("my-queue/42"), publishedMessage.Payload) + + // Verify payload is a serialized Cancel entity + cancel, err := entity.CancelFromBytes(publishedMessage.Payload) + require.NoError(t, err) + assert.Equal(t, "my-queue/42", cancel.Sqid) } func TestCancel_ReturnsErrorOnPublishFailure(t *testing.T) { diff --git a/gateway/proto/gateway.proto b/gateway/proto/gateway.proto index 409a553e..c80229f8 100644 --- a/gateway/proto/gateway.proto +++ b/gateway/proto/gateway.proto @@ -79,10 +79,32 @@ message CancelRequest { string sqid = 1; } +// RequestStatus defines the possible states of a land request. +enum RequestStatus { + // Unknown status. + UNKNOWN = 0; + // Initial state of a land request. Confirmed by the system but processing has not started. + NEW = 1; + // The request has been validated (duplicate check, merge check etc.) successfully. + VALIDATED = 2; + // The request is being processed. + PROCESSING = 3; + // The request has been successfully processed and landed. Terminal state. + LANDED = 4; + // The cancellation request has been accepted and is being processed. + CANCELLATION_ACCEPTED = 5; + // The request has been cancelled. Terminal state. + CANCELLED = 6; +} + // CancelResponse defines the response to a cancel request. message CancelResponse { - // Globally unique identifier for the land request that was cancelled. + // Globally unique identifier for the land request that was requested to be cancelled. string sqid = 1; + // Current status of the land request at the time of cancellation request. + RequestStatus current_status = 2; + // Optional error describing why the cancellation could not be completed. + Error error = 3; } // *************** @@ -116,7 +138,11 @@ service SubmitQueueGateway { // The processing is asynchronous and returns a LandResponse immediately. The land request is processed in the background. rpc Land(LandRequest) returns (LandResponse) {} - // Cancel cancels a previously submitted land request. The cancellation is asynchronous and - // published to a queue for processing. No validation is performed on whether the request exists. + // Cancel requests cancellation of a previously submitted land request. + // The cancellation is asynchronous and published to a queue for + // processing. Note that cancellation is best effort and not guaranteed. + // To check for the status of the request, use Status API (TODO). A + // "Cancelled" status indicates that the requested cancellation was + // successful. rpc Cancel(CancelRequest) returns (CancelResponse) {} } diff --git a/gateway/protopb/gateway.pb.go b/gateway/protopb/gateway.pb.go index 571e4298..646508d3 100644 --- a/gateway/protopb/gateway.pb.go +++ b/gateway/protopb/gateway.pb.go @@ -78,6 +78,75 @@ func (Strategy) EnumDescriptor() ([]byte, []int) { return file_gateway_proto_rawDescGZIP(), []int{0} } +// RequestStatus defines the possible states of a land request. +type RequestStatus int32 + +const ( + // Unknown status. Should never be seen in the system. + RequestStatus_UNKNOWN RequestStatus = 0 + // Initial state of a land request. Confirmed by the system but processing has not started. + RequestStatus_NEW RequestStatus = 1 + // The request has been validated (duplicate check, merge check etc.) successfully. + RequestStatus_VALIDATED RequestStatus = 2 + // The request is being processed. + RequestStatus_PROCESSING RequestStatus = 3 + // The request has been successfully processed and landed. Terminal state. + RequestStatus_LANDED RequestStatus = 4 + // The cancellation request has been accepted and is being processed. + RequestStatus_CANCELLATION_ACCEPTED RequestStatus = 5 + // The request has been cancelled. Terminal state. + RequestStatus_CANCELLED RequestStatus = 6 +) + +// Enum value maps for RequestStatus. +var ( + RequestStatus_name = map[int32]string{ + 0: "UNKNOWN", + 1: "NEW", + 2: "VALIDATED", + 3: "PROCESSING", + 4: "LANDED", + 5: "CANCELLATION_ACCEPTED", + 6: "CANCELLED", + } + RequestStatus_value = map[string]int32{ + "UNKNOWN": 0, + "NEW": 1, + "VALIDATED": 2, + "PROCESSING": 3, + "LANDED": 4, + "CANCELLATION_ACCEPTED": 5, + "CANCELLED": 6, + } +) + +func (x RequestStatus) Enum() *RequestStatus { + p := new(RequestStatus) + *p = x + return p +} + +func (x RequestStatus) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (RequestStatus) Descriptor() protoreflect.EnumDescriptor { + return file_gateway_proto_enumTypes[1].Descriptor() +} + +func (RequestStatus) Type() protoreflect.EnumType { + return &file_gateway_proto_enumTypes[1] +} + +func (x RequestStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use RequestStatus.Descriptor instead. +func (RequestStatus) EnumDescriptor() ([]byte, []int) { + return file_gateway_proto_rawDescGZIP(), []int{1} +} + // PingRequest is the request for the Ping method type PingRequest struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -414,7 +483,11 @@ func (x *CancelRequest) GetSqid() string { type CancelResponse struct { state protoimpl.MessageState `protogen:"open.v1"` // Globally unique identifier for the land request that was cancelled. - Sqid string `protobuf:"bytes,1,opt,name=sqid,proto3" json:"sqid,omitempty"` + Sqid string `protobuf:"bytes,1,opt,name=sqid,proto3" json:"sqid,omitempty"` + // Current status of the land request at the time of cancellation. + CurrentStatus RequestStatus `protobuf:"varint,2,opt,name=current_status,json=currentStatus,proto3,enum=uber.devexp.submitqueue.gateway.RequestStatus" json:"current_status,omitempty"` + // Optional error describing why the cancellation could not be completed. + Error *Error `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -456,6 +529,20 @@ func (x *CancelResponse) GetSqid() string { return "" } +func (x *CancelResponse) GetCurrentStatus() RequestStatus { + if x != nil { + return x.CurrentStatus + } + return RequestStatus_UNKNOWN +} + +func (x *CancelResponse) GetError() *Error { + if x != nil { + return x.Error + } + return nil +} + // Generic error with metadata. Each custom error type should extend this message. type Error struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -578,9 +665,11 @@ const file_gateway_proto_rawDesc = "" + "\fLandResponse\x12\x12\n" + "\x04sqid\x18\x01 \x01(\tR\x04sqid\"#\n" + "\rCancelRequest\x12\x12\n" + - "\x04sqid\x18\x01 \x01(\tR\x04sqid\"$\n" + + "\x04sqid\x18\x01 \x01(\tR\x04sqid\"\xb9\x01\n" + "\x0eCancelResponse\x12\x12\n" + - "\x04sqid\x18\x01 \x01(\tR\x04sqid\"!\n" + + "\x04sqid\x18\x01 \x01(\tR\x04sqid\x12U\n" + + "\x0ecurrent_status\x18\x02 \x01(\x0e2..uber.devexp.submitqueue.gateway.RequestStatusR\rcurrentStatus\x12<\n" + + "\x05error\x18\x03 \x01(\v2&.uber.devexp.submitqueue.gateway.ErrorR\x05error\"!\n" + "\x05Error\x12\x18\n" + "\amessage\x18\x01 \x01(\tR\amessage\"l\n" + "\x16UnrecognizedQueueError\x12<\n" + @@ -591,7 +680,17 @@ const file_gateway_proto_rawDesc = "" + "\n" + "\x06REBASE\x10\x01\x12\x11\n" + "\rSQUASH_REBASE\x10\x02\x12\t\n" + - "\x05MERGE\x10\x032\xcf\x02\n" + + "\x05MERGE\x10\x03*z\n" + + "\rRequestStatus\x12\v\n" + + "\aUNKNOWN\x10\x00\x12\a\n" + + "\x03NEW\x10\x01\x12\r\n" + + "\tVALIDATED\x10\x02\x12\x0e\n" + + "\n" + + "PROCESSING\x10\x03\x12\n" + + "\n" + + "\x06LANDED\x10\x04\x12\x19\n" + + "\x15CANCELLATION_ACCEPTED\x10\x05\x12\r\n" + + "\tCANCELLED\x10\x062\xcf\x02\n" + "\x12SubmitQueueGateway\x12e\n" + "\x04Ping\x12,.uber.devexp.submitqueue.gateway.PingRequest\x1a-.uber.devexp.submitqueue.gateway.PingResponse\"\x00\x12e\n" + "\x04Land\x12,.uber.devexp.submitqueue.gateway.LandRequest\x1a-.uber.devexp.submitqueue.gateway.LandResponse\"\x00\x12k\n" + @@ -610,35 +709,38 @@ func file_gateway_proto_rawDescGZIP() []byte { return file_gateway_proto_rawDescData } -var file_gateway_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_gateway_proto_enumTypes = make([]protoimpl.EnumInfo, 2) var file_gateway_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_gateway_proto_goTypes = []any{ (Strategy)(0), // 0: uber.devexp.submitqueue.gateway.Strategy - (*PingRequest)(nil), // 1: uber.devexp.submitqueue.gateway.PingRequest - (*PingResponse)(nil), // 2: uber.devexp.submitqueue.gateway.PingResponse - (*Change)(nil), // 3: uber.devexp.submitqueue.gateway.Change - (*LandRequest)(nil), // 4: uber.devexp.submitqueue.gateway.LandRequest - (*LandResponse)(nil), // 5: uber.devexp.submitqueue.gateway.LandResponse - (*CancelRequest)(nil), // 6: uber.devexp.submitqueue.gateway.CancelRequest - (*CancelResponse)(nil), // 7: uber.devexp.submitqueue.gateway.CancelResponse - (*Error)(nil), // 8: uber.devexp.submitqueue.gateway.Error - (*UnrecognizedQueueError)(nil), // 9: uber.devexp.submitqueue.gateway.UnrecognizedQueueError + (RequestStatus)(0), // 1: uber.devexp.submitqueue.gateway.RequestStatus + (*PingRequest)(nil), // 2: uber.devexp.submitqueue.gateway.PingRequest + (*PingResponse)(nil), // 3: uber.devexp.submitqueue.gateway.PingResponse + (*Change)(nil), // 4: uber.devexp.submitqueue.gateway.Change + (*LandRequest)(nil), // 5: uber.devexp.submitqueue.gateway.LandRequest + (*LandResponse)(nil), // 6: uber.devexp.submitqueue.gateway.LandResponse + (*CancelRequest)(nil), // 7: uber.devexp.submitqueue.gateway.CancelRequest + (*CancelResponse)(nil), // 8: uber.devexp.submitqueue.gateway.CancelResponse + (*Error)(nil), // 9: uber.devexp.submitqueue.gateway.Error + (*UnrecognizedQueueError)(nil), // 10: uber.devexp.submitqueue.gateway.UnrecognizedQueueError } var file_gateway_proto_depIdxs = []int32{ - 3, // 0: uber.devexp.submitqueue.gateway.LandRequest.change:type_name -> uber.devexp.submitqueue.gateway.Change + 4, // 0: uber.devexp.submitqueue.gateway.LandRequest.change:type_name -> uber.devexp.submitqueue.gateway.Change 0, // 1: uber.devexp.submitqueue.gateway.LandRequest.strategy:type_name -> uber.devexp.submitqueue.gateway.Strategy - 8, // 2: uber.devexp.submitqueue.gateway.UnrecognizedQueueError.error:type_name -> uber.devexp.submitqueue.gateway.Error - 1, // 3: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Ping:input_type -> uber.devexp.submitqueue.gateway.PingRequest - 4, // 4: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Land:input_type -> uber.devexp.submitqueue.gateway.LandRequest - 6, // 5: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Cancel:input_type -> uber.devexp.submitqueue.gateway.CancelRequest - 2, // 6: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Ping:output_type -> uber.devexp.submitqueue.gateway.PingResponse - 5, // 7: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Land:output_type -> uber.devexp.submitqueue.gateway.LandResponse - 7, // 8: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Cancel:output_type -> uber.devexp.submitqueue.gateway.CancelResponse - 6, // [6:9] is the sub-list for method output_type - 3, // [3:6] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 1, // 2: uber.devexp.submitqueue.gateway.CancelResponse.current_status:type_name -> uber.devexp.submitqueue.gateway.RequestStatus + 9, // 3: uber.devexp.submitqueue.gateway.CancelResponse.error:type_name -> uber.devexp.submitqueue.gateway.Error + 9, // 4: uber.devexp.submitqueue.gateway.UnrecognizedQueueError.error:type_name -> uber.devexp.submitqueue.gateway.Error + 2, // 5: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Ping:input_type -> uber.devexp.submitqueue.gateway.PingRequest + 5, // 6: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Land:input_type -> uber.devexp.submitqueue.gateway.LandRequest + 7, // 7: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Cancel:input_type -> uber.devexp.submitqueue.gateway.CancelRequest + 3, // 8: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Ping:output_type -> uber.devexp.submitqueue.gateway.PingResponse + 6, // 9: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Land:output_type -> uber.devexp.submitqueue.gateway.LandResponse + 8, // 10: uber.devexp.submitqueue.gateway.SubmitQueueGateway.Cancel:output_type -> uber.devexp.submitqueue.gateway.CancelResponse + 8, // [8:11] is the sub-list for method output_type + 5, // [5:8] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_gateway_proto_init() } @@ -651,7 +753,7 @@ func file_gateway_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_gateway_proto_rawDesc), len(file_gateway_proto_rawDesc)), - NumEnums: 1, + NumEnums: 2, NumMessages: 9, NumExtensions: 0, NumServices: 1, diff --git a/gateway/protopb/gateway.pb.yarpc.go b/gateway/protopb/gateway.pb.yarpc.go index ca8b152c..dc18521d 100644 --- a/gateway/protopb/gateway.pb.yarpc.go +++ b/gateway/protopb/gateway.pb.yarpc.go @@ -333,39 +333,47 @@ var ( var yarpcFileDescriptorClosuref1a937782ebbded5 = [][]byte{ // gateway.proto []byte{ - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x94, 0xcb, 0x6e, 0xd3, 0x4c, - 0x14, 0xc7, 0xe3, 0x5c, 0xdc, 0xe4, 0x38, 0xa9, 0xf2, 0x8d, 0x3e, 0x21, 0x2b, 0xaa, 0x44, 0xea, - 0x22, 0x1a, 0x6e, 0xb6, 0x14, 0xb6, 0x48, 0x28, 0x29, 0xa6, 0x2c, 0x0a, 0x4a, 0x6d, 0xb2, 0x61, - 0x53, 0x8d, 0x9d, 0x23, 0xc7, 0xa2, 0xbe, 0xc4, 0x33, 0x2e, 0x94, 0x3d, 0x4f, 0xc3, 0xc3, 0xf0, - 0x4a, 0xc8, 0xe3, 0x71, 0xeb, 0x05, 0xd4, 0xd9, 0x9d, 0x19, 0x9f, 0xdf, 0xfc, 0xff, 0xe7, 0x22, - 0xc3, 0x28, 0xa0, 0x1c, 0xbf, 0xd1, 0x5b, 0x33, 0xcd, 0x12, 0x9e, 0x90, 0xc7, 0xb9, 0x87, 0x99, - 0xb9, 0xc1, 0x1b, 0xfc, 0x9e, 0x9a, 0x2c, 0xf7, 0xa2, 0x90, 0xef, 0x72, 0xcc, 0xd1, 0x94, 0x69, - 0xc6, 0x29, 0x68, 0xab, 0x30, 0x0e, 0x1c, 0xdc, 0xe5, 0xc8, 0x38, 0xd1, 0xe1, 0x20, 0x42, 0xc6, - 0x68, 0x80, 0xba, 0x32, 0x55, 0x66, 0x03, 0xa7, 0x3a, 0x1a, 0x3f, 0x15, 0x18, 0x96, 0x99, 0x2c, - 0x4d, 0x62, 0x86, 0xff, 0x4e, 0x25, 0xc7, 0x30, 0x64, 0x98, 0xdd, 0x84, 0x3e, 0x5e, 0xc5, 0x34, - 0x42, 0xbd, 0x2d, 0x3e, 0x6b, 0xf2, 0xee, 0x13, 0x8d, 0x90, 0x1c, 0xc1, 0x80, 0x87, 0x11, 0x32, - 0x4e, 0xa3, 0x54, 0xef, 0x4c, 0x95, 0x59, 0xc7, 0xb9, 0xbf, 0x20, 0x13, 0xe8, 0x6f, 0x13, 0xc6, - 0x05, 0xdc, 0x15, 0xf0, 0xdd, 0xd9, 0x38, 0x02, 0xf5, 0x6c, 0x4b, 0xe3, 0x00, 0x09, 0x81, 0x6e, - 0x9e, 0x85, 0x4c, 0x57, 0xa6, 0x9d, 0xd9, 0xc0, 0x11, 0xb1, 0xf1, 0x4b, 0x01, 0xed, 0x82, 0xc6, - 0x9b, 0xaa, 0x9e, 0xff, 0xa1, 0x27, 0xea, 0x95, 0x16, 0xcb, 0x03, 0x79, 0x0b, 0xaa, 0x2f, 0xde, - 0x10, 0xd6, 0xb4, 0xf9, 0xa9, 0xd9, 0xd0, 0x26, 0xb3, 0x94, 0x74, 0x24, 0x46, 0x6c, 0xe8, 0x33, - 0x9e, 0x51, 0x8e, 0xc1, 0xad, 0x30, 0x78, 0x38, 0x7f, 0xd6, 0xf8, 0x84, 0x2b, 0x01, 0xe7, 0x0e, - 0x35, 0x0c, 0x18, 0x96, 0x66, 0x65, 0x4b, 0x09, 0x74, 0xd9, 0x2e, 0xdc, 0x48, 0xb3, 0x22, 0x36, - 0x4e, 0x60, 0x74, 0x46, 0x63, 0x1f, 0xaf, 0xab, 0x92, 0xfe, 0x96, 0xf4, 0x04, 0x0e, 0xab, 0xa4, - 0x07, 0x9e, 0x3a, 0x86, 0x9e, 0x9d, 0x65, 0x49, 0xf6, 0xc0, 0x94, 0xaf, 0xe1, 0xd1, 0x3a, 0xce, - 0xd0, 0x4f, 0x82, 0x38, 0xfc, 0x81, 0x9b, 0xcb, 0xa2, 0x82, 0x92, 0x79, 0x03, 0x3d, 0x2c, 0x02, - 0x41, 0x68, 0xf3, 0xa7, 0x8d, 0xf5, 0x0a, 0xcc, 0x29, 0xa1, 0xfb, 0x39, 0xb4, 0x6b, 0x73, 0x78, - 0xbe, 0x80, 0x7e, 0xd5, 0x15, 0xa2, 0xc1, 0xc1, 0x3b, 0xfb, 0xfd, 0x62, 0x7d, 0xf1, 0x79, 0xdc, - 0x22, 0x00, 0xaa, 0x63, 0x2f, 0x17, 0xae, 0x3d, 0x56, 0xc8, 0x7f, 0x30, 0x72, 0x2f, 0xd7, 0x0b, - 0xf7, 0xc3, 0x95, 0xbc, 0x6a, 0x93, 0x01, 0xf4, 0x3e, 0xda, 0xce, 0xb9, 0x3d, 0xee, 0xcc, 0x7f, - 0xb7, 0x81, 0xb8, 0x42, 0x5d, 0x78, 0x3d, 0x2f, 0xc5, 0x09, 0x42, 0xb7, 0x58, 0x56, 0xf2, 0xb2, - 0xd1, 0x66, 0x6d, 0xfb, 0x27, 0xaf, 0xf6, 0xcc, 0x2e, 0x7b, 0x6c, 0xb4, 0x0a, 0x99, 0x62, 0x80, - 0x7b, 0xc8, 0xd4, 0x96, 0x72, 0x0f, 0x99, 0xfa, 0x56, 0x18, 0x2d, 0xf2, 0x15, 0xd4, 0x72, 0xbc, - 0xc4, 0x6c, 0xde, 0xd4, 0xfa, 0xb2, 0x4c, 0xac, 0xbd, 0xf3, 0x2b, 0xb1, 0xa5, 0x07, 0x27, 0x7e, - 0x12, 0x35, 0x71, 0xcb, 0xa1, 0x6c, 0xf5, 0xaa, 0xf8, 0xcf, 0xac, 0x94, 0x2f, 0x2f, 0x82, 0x90, - 0x6f, 0x73, 0xcf, 0xf4, 0x93, 0xc8, 0x2a, 0x58, 0xab, 0x06, 0x59, 0x12, 0xb2, 0xc4, 0x4f, 0x29, - 0xf5, 0x3c, 0x55, 0x04, 0xaf, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0xf4, 0xdb, 0xc5, 0xbd, 0xae, - 0x04, 0x00, 0x00, + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xdb, 0x6e, 0xd3, 0x4c, + 0x10, 0x8e, 0x73, 0x6a, 0x33, 0x39, 0xc8, 0xff, 0xe8, 0x07, 0x85, 0xaa, 0x12, 0xad, 0x2b, 0xd1, + 0x52, 0xc0, 0x91, 0xc2, 0x2d, 0x12, 0x72, 0x9d, 0xa5, 0x54, 0x04, 0x37, 0xb5, 0x1b, 0x2a, 0x71, + 0x13, 0x39, 0xce, 0xca, 0xb5, 0xa8, 0xed, 0xd4, 0xbb, 0x2e, 0xb4, 0xf7, 0x3c, 0x0d, 0x4f, 0xc0, + 0x53, 0xf0, 0x4a, 0xc8, 0xeb, 0x4d, 0x9b, 0x4a, 0x14, 0x47, 0xdc, 0xed, 0xac, 0xbf, 0xc3, 0xcc, + 0x78, 0x66, 0xa1, 0xed, 0xbb, 0x9c, 0x7e, 0x75, 0xaf, 0xf5, 0x79, 0x12, 0xf3, 0x18, 0x9f, 0xa6, + 0x53, 0x9a, 0xe8, 0x33, 0x7a, 0x45, 0xbf, 0xcd, 0x75, 0x96, 0x4e, 0xc3, 0x80, 0x5f, 0xa6, 0x34, + 0xa5, 0xba, 0x84, 0x69, 0xbb, 0xd0, 0x1c, 0x05, 0x91, 0x6f, 0xd3, 0xcb, 0x94, 0x32, 0x8e, 0x5d, + 0x58, 0x0b, 0x29, 0x63, 0xae, 0x4f, 0xbb, 0xca, 0x96, 0xb2, 0xd7, 0xb0, 0x17, 0xa1, 0xf6, 0x5d, + 0x81, 0x56, 0x8e, 0x64, 0xf3, 0x38, 0x62, 0xf4, 0x61, 0x28, 0x6e, 0x43, 0x8b, 0xd1, 0xe4, 0x2a, + 0xf0, 0xe8, 0x24, 0x72, 0x43, 0xda, 0x2d, 0x8b, 0xcf, 0x4d, 0x79, 0x67, 0xb9, 0x21, 0xc5, 0x4d, + 0x68, 0xf0, 0x20, 0xa4, 0x8c, 0xbb, 0xe1, 0xbc, 0x5b, 0xd9, 0x52, 0xf6, 0x2a, 0xf6, 0xdd, 0x05, + 0x6e, 0xc0, 0xfa, 0x79, 0xcc, 0xb8, 0x20, 0x57, 0x05, 0xf9, 0x36, 0xd6, 0x36, 0xa1, 0x6e, 0x9e, + 0xbb, 0x91, 0x4f, 0x11, 0xa1, 0x9a, 0x26, 0x01, 0xeb, 0x2a, 0x5b, 0x95, 0xbd, 0x86, 0x2d, 0xce, + 0xda, 0x0f, 0x05, 0x9a, 0x43, 0x37, 0x9a, 0x2d, 0xea, 0xf9, 0x1f, 0x6a, 0xa2, 0x5e, 0x99, 0x62, + 0x1e, 0xe0, 0x5b, 0xa8, 0x7b, 0x42, 0x43, 0xa4, 0xd6, 0xec, 0xef, 0xea, 0x05, 0x6d, 0xd2, 0x73, + 0x4b, 0x5b, 0xd2, 0x90, 0xc0, 0x3a, 0xe3, 0x89, 0xcb, 0xa9, 0x7f, 0x2d, 0x12, 0xec, 0xf4, 0x9f, + 0x17, 0x4a, 0x38, 0x92, 0x60, 0xdf, 0x52, 0x35, 0x0d, 0x5a, 0x79, 0xb2, 0xb2, 0xa5, 0x08, 0x55, + 0x76, 0x19, 0xcc, 0x64, 0xb2, 0xe2, 0xac, 0xed, 0x40, 0xdb, 0x74, 0x23, 0x8f, 0x5e, 0x2c, 0x4a, + 0xfa, 0x13, 0xe8, 0xa7, 0x02, 0x9d, 0x05, 0xea, 0x61, 0x2d, 0x1c, 0x43, 0xc7, 0x4b, 0x93, 0x84, + 0x46, 0x7c, 0xc2, 0xb8, 0xcb, 0x53, 0x26, 0xea, 0xef, 0xf4, 0xf5, 0xc2, 0xe4, 0xa5, 0xb9, 0x23, + 0x58, 0x76, 0x5b, 0xaa, 0xe4, 0x21, 0xbe, 0x81, 0x1a, 0x4d, 0x92, 0x38, 0x11, 0x3f, 0xb2, 0xd9, + 0x7f, 0x56, 0xa8, 0x46, 0x32, 0xb4, 0x9d, 0x93, 0xb4, 0x6d, 0xa8, 0x89, 0xf8, 0x2f, 0xb3, 0x77, + 0x01, 0x8f, 0xc7, 0x51, 0x42, 0xbd, 0xd8, 0x8f, 0x82, 0x1b, 0x3a, 0x3b, 0xc9, 0xc4, 0x72, 0xce, + 0xad, 0xb5, 0xf2, 0x0f, 0xd6, 0x77, 0xd3, 0x51, 0x5e, 0x9a, 0x8e, 0x7d, 0x03, 0xd6, 0x17, 0xff, + 0x0a, 0x9b, 0xb0, 0x36, 0x20, 0xef, 0x8c, 0xf1, 0xf0, 0x54, 0x2d, 0x21, 0x40, 0xdd, 0x26, 0x07, + 0x86, 0x43, 0x54, 0x05, 0xff, 0x83, 0xb6, 0x73, 0x32, 0x36, 0x9c, 0xf7, 0x13, 0x79, 0x55, 0xc6, + 0x06, 0xd4, 0x3e, 0x12, 0xfb, 0x90, 0xa8, 0x95, 0xfd, 0x1b, 0x68, 0xdf, 0xeb, 0x58, 0xa6, 0x33, + 0xb6, 0x3e, 0x58, 0xc7, 0x67, 0x96, 0x5a, 0xc2, 0x35, 0xa8, 0x58, 0xe4, 0x4c, 0x55, 0xb0, 0x0d, + 0x8d, 0x4f, 0xc6, 0xf0, 0x68, 0x60, 0x9c, 0x92, 0x81, 0x5a, 0xc6, 0x0e, 0xc0, 0xc8, 0x3e, 0x36, + 0x89, 0xe3, 0x1c, 0x59, 0x87, 0x6a, 0x25, 0xf3, 0x1b, 0x1a, 0xd6, 0x80, 0x0c, 0xd4, 0x2a, 0x3e, + 0x81, 0x47, 0xa6, 0x61, 0x99, 0x64, 0x38, 0x34, 0x4e, 0x8f, 0x8e, 0xad, 0x89, 0x61, 0x9a, 0x64, + 0x94, 0xd1, 0x6a, 0x99, 0x8a, 0xfc, 0x44, 0x06, 0x6a, 0xbd, 0xff, 0xab, 0x0c, 0xe8, 0x88, 0xca, + 0x45, 0x9f, 0x0e, 0xf3, 0xc2, 0x91, 0x42, 0x35, 0x5b, 0x5f, 0x7c, 0x59, 0xd8, 0xa2, 0xa5, 0xf7, + 0x60, 0xe3, 0xd5, 0x8a, 0xe8, 0x7c, 0xe8, 0xb4, 0x52, 0x66, 0x93, 0x8d, 0xf4, 0x0a, 0x36, 0x4b, + 0x6b, 0xba, 0x82, 0xcd, 0xf2, 0x9e, 0x68, 0x25, 0xfc, 0x02, 0xf5, 0x7c, 0xde, 0xb1, 0x78, 0x76, + 0xef, 0xad, 0xcf, 0x46, 0x6f, 0x65, 0xfc, 0xc2, 0xec, 0x60, 0x0a, 0x3b, 0x5e, 0x1c, 0x16, 0xf1, + 0x0e, 0x5a, 0xb2, 0xd5, 0xa3, 0xec, 0xe5, 0x1d, 0x29, 0x9f, 0x5f, 0xf8, 0x01, 0x3f, 0x4f, 0xa7, + 0xba, 0x17, 0x87, 0xbd, 0x8c, 0xdb, 0x5b, 0x22, 0xf5, 0x24, 0xa9, 0x27, 0x9e, 0xe9, 0xf9, 0x74, + 0x5a, 0x17, 0x87, 0xd7, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xf7, 0x64, 0x71, 0x91, 0xc0, 0x05, + 0x00, 0x00, }, } diff --git a/gateway/protopb/gateway_grpc.pb.go b/gateway/protopb/gateway_grpc.pb.go index 695ba3bc..5eef2770 100644 --- a/gateway/protopb/gateway_grpc.pb.go +++ b/gateway/protopb/gateway_grpc.pb.go @@ -35,8 +35,12 @@ type SubmitQueueGatewayClient interface { // Land lands a set of code changes into a target branch, performing the necessary validations across all other changes in the queue. // The processing is asynchronous and returns a LandResponse immediately. The land request is processed in the background. Land(ctx context.Context, in *LandRequest, opts ...grpc.CallOption) (*LandResponse, error) - // Cancel cancels a previously submitted land request. The cancellation is asynchronous and - // published to a queue for processing. No validation is performed on whether the request exists. + // Cancel requests cancellation of a previously submitted land request. + // The cancellation is asynchronous and published to a queue for + // processing. Note that cancellation is best effort and not guaranteed. + // To check for the status of the request, use Status API (TODO). A + // "Cancelled" status indicates that the requested cancellation was + // successful. Cancel(ctx context.Context, in *CancelRequest, opts ...grpc.CallOption) (*CancelResponse, error) } @@ -89,8 +93,12 @@ type SubmitQueueGatewayServer interface { // Land lands a set of code changes into a target branch, performing the necessary validations across all other changes in the queue. // The processing is asynchronous and returns a LandResponse immediately. The land request is processed in the background. Land(context.Context, *LandRequest) (*LandResponse, error) - // Cancel cancels a previously submitted land request. The cancellation is asynchronous and - // published to a queue for processing. No validation is performed on whether the request exists. + // Cancel requests cancellation of a previously submitted land request. + // The cancellation is asynchronous and published to a queue for + // processing. Note that cancellation is best effort and not guaranteed. + // To check for the status of the request, use Status API (TODO). A + // "Cancelled" status indicates that the requested cancellation was + // successful. Cancel(context.Context, *CancelRequest) (*CancelResponse, error) mustEmbedUnimplementedSubmitQueueGatewayServer() }