-
Notifications
You must be signed in to change notification settings - Fork 0
feat(controller) Add and scaffold Cancel API in gateway #114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| package entity | ||
|
|
||
| import "encoding/json" | ||
|
|
||
| // Cancel represents a request to cancel a previously submitted land request. | ||
| // The object is immutable after creation. | ||
| type Cancel struct { | ||
| // Sqid is the globally unique identifier of the land request to cancel. | ||
| Sqid string `json:"sqid"` | ||
| } | ||
|
|
||
| // ToBytes serializes the Cancel to JSON bytes for queue message payload. | ||
| func (c Cancel) ToBytes() ([]byte, error) { | ||
| return json.Marshal(c) | ||
| } | ||
|
|
||
| // CancelFromBytes deserializes a Cancel from JSON bytes. | ||
| func CancelFromBytes(data []byte) (Cancel, error) { | ||
| var c Cancel | ||
| err := json.Unmarshal(data, &c) | ||
| return c, err | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| package entity | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestCancel_ToBytes(t *testing.T) { | ||
| cancel := Cancel{ | ||
| Sqid: "test-queue/123", | ||
| } | ||
|
|
||
| data, err := cancel.ToBytes() | ||
| require.NoError(t, err) | ||
| assert.Contains(t, string(data), "test-queue/123") | ||
| } | ||
|
|
||
| func TestCancelFromBytes(t *testing.T) { | ||
| original := Cancel{ | ||
| Sqid: "my-queue/999", | ||
| } | ||
|
|
||
| data, err := original.ToBytes() | ||
| require.NoError(t, err) | ||
|
|
||
| deserialized, err := CancelFromBytes(data) | ||
| require.NoError(t, err) | ||
| assert.Equal(t, original, deserialized) | ||
| } | ||
|
|
||
| func TestCancelFromBytes_InvalidJSON(t *testing.T) { | ||
| _, err := CancelFromBytes([]byte(`{"invalid": json"}`)) | ||
| assert.Error(t, err) | ||
| } | ||
|
|
||
| func TestCancel_SerializationRoundTrip(t *testing.T) { | ||
| tests := []struct { | ||
| name string | ||
| cancel Cancel | ||
| }{ | ||
| { | ||
| name: "standard sqid", | ||
| cancel: Cancel{Sqid: "queue1/100"}, | ||
| }, | ||
| { | ||
| name: "long sqid", | ||
| cancel: Cancel{Sqid: "my-long-queue-name/999999"}, | ||
| }, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| data, err := tt.cancel.ToBytes() | ||
| require.NoError(t, err) | ||
|
|
||
| deserialized, err := CancelFromBytes(data) | ||
| require.NoError(t, err) | ||
| assert.Equal(t, tt.cancel, deserialized) | ||
| }) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| package controller | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "github.com/uber-go/tally/v4" | ||
| "github.com/uber/submitqueue/core/metrics" | ||
| "github.com/uber/submitqueue/entity" | ||
| "github.com/uber/submitqueue/entity/queue" | ||
| extqueue "github.com/uber/submitqueue/extension/queue" | ||
| pb "github.com/uber/submitqueue/gateway/protopb" | ||
| "go.uber.org/zap" | ||
| ) | ||
|
|
||
| // CancelController handles cancel business logic for the gateway | ||
| type CancelController struct { | ||
| logger *zap.SugaredLogger | ||
| metricsScope tally.Scope | ||
| publisher extqueue.Publisher | ||
| topic string // Topic to publish cancel requests to (e.g., "cancel") | ||
| } | ||
|
|
||
| // NewCancelController creates a new instance of the gateway cancel controller. | ||
| // topic: the queue topic to publish cancel requests to (e.g., "cancel") | ||
| func NewCancelController(logger *zap.SugaredLogger, scope tally.Scope, publisher extqueue.Publisher, topic string) *CancelController { | ||
| return &CancelController{ | ||
| logger: logger, | ||
| metricsScope: scope, | ||
| publisher: publisher, | ||
| topic: topic, | ||
| } | ||
| } | ||
|
|
||
| // Cancel handles the cancel request and returns a response. | ||
| // It publishes the sqid to the cancel topic for async processing. | ||
| // No validation is performed — the orchestrator handles that. | ||
| func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (resp *pb.CancelResponse, retErr error) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to future us (non blocking): we should discuss if controllers should receive proto structures directly. It seems that the final surfacing of the error should be a responsibility of a service, not a controller. |
||
| op := metrics.Begin(c.metricsScope, "cancel") | ||
| defer func() { op.Complete(retErr) }() | ||
|
|
||
| sqid := req.Sqid | ||
|
manjari25 marked this conversation as resolved.
|
||
|
|
||
| if sqid == "" { | ||
| return &pb.CancelResponse{ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should be INVALID_ARGUMENT + BadMessage first class GRPC error
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The transformation from Go errors to GRPC errors is better to be done at a certain distinct function, means |
||
| Error: &pb.Error{Message: "sqid is required"}, | ||
| }, nil | ||
| } | ||
|
|
||
| // TODO - | ||
| // Look up event store to see if sqid exists - | ||
| // - if found and request is already in a terminal state; return cancellation failed with appropriate | ||
| // error message. | ||
| // - if found and request is not in a terminal state; publish a cancel entity onto the cancel topic | ||
| // to be picked up by the orchestrator. | ||
| // - if not found; publish a cancel entity onto the cancel topic | ||
| // to be picked up by the orchestrator. | ||
| cancel := entity.Cancel{ | ||
| Sqid: sqid, | ||
| } | ||
|
|
||
| if err := c.publishToQueue(ctx, cancel); err != nil { | ||
| c.logger.Errorw("failed to publish cancel request to queue", | ||
| "sqid", sqid, | ||
| "error", err, | ||
| ) | ||
| return nil, fmt.Errorf("CancelController failed to publish cancel request to queue: %w", err) | ||
| } | ||
|
|
||
| c.logger.Infow("cancel request published to queue", | ||
| "sqid", sqid, | ||
| "topic", c.topic, | ||
| ) | ||
|
|
||
| return &pb.CancelResponse{ | ||
| Sqid: sqid, | ||
| CurrentStatus: pb.RequestStatus_CANCELLATION_ACCEPTED, | ||
| }, nil | ||
| } | ||
|
|
||
| // publishToQueue publishes a cancel entity to the cancel queue for async processing. | ||
| func (c *CancelController) publishToQueue(ctx context.Context, cancel entity.Cancel) error { | ||
| payload, err := cancel.ToBytes() | ||
| if err != nil { | ||
| return fmt.Errorf("failed to serialize cancel entity: %w", err) | ||
| } | ||
|
|
||
| // Create queue message | ||
| // - Message ID: sqid for idempotency | ||
| // - Payload: serialized Cancel entity | ||
| // - Partition key: empty (no inherent ordering for cancel requests) | ||
| msg := queue.NewMessage(cancel.Sqid, payload, "", nil) | ||
|
|
||
| if err := c.publisher.Publish(ctx, c.topic, msg); err != nil { | ||
| return fmt.Errorf("failed to publish message: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| package controller | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| "github.com/uber-go/tally/v4" | ||
| "github.com/uber/submitqueue/entity" | ||
| "github.com/uber/submitqueue/entity/queue" | ||
| queuemock "github.com/uber/submitqueue/extension/queue/mock" | ||
| pb "github.com/uber/submitqueue/gateway/protopb" | ||
| "go.uber.org/mock/gomock" | ||
| "go.uber.org/zap" | ||
| ) | ||
|
|
||
| func TestNewCancelController(t *testing.T) { | ||
| ctrl := gomock.NewController(t) | ||
| controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, noopPublisher(ctrl), "cancel") | ||
| require.NotNil(t, controller) | ||
| } | ||
|
|
||
| func TestCancel_ReturnsErrorOnEmptySqid(t *testing.T) { | ||
| ctrl := gomock.NewController(t) | ||
| controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, noopPublisher(ctrl), "cancel") | ||
| ctx := context.Background() | ||
|
|
||
| req := &pb.CancelRequest{Sqid: ""} | ||
| resp, err := controller.Cancel(ctx, req) | ||
|
|
||
| require.NoError(t, err) | ||
| assert.Empty(t, resp.Sqid) | ||
| assert.Equal(t, pb.RequestStatus_UNKNOWN, resp.CurrentStatus) | ||
| require.NotNil(t, resp.Error) | ||
| assert.Equal(t, "sqid is required", resp.Error.Message) | ||
| } | ||
|
|
||
| func TestCancel_ReturnsSqid(t *testing.T) { | ||
| ctrl := gomock.NewController(t) | ||
| controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, noopPublisher(ctrl), "cancel") | ||
| ctx := context.Background() | ||
|
|
||
| req := &pb.CancelRequest{Sqid: "test-queue/123"} | ||
| resp, err := controller.Cancel(ctx, req) | ||
|
|
||
| require.NoError(t, err) | ||
| assert.Equal(t, "test-queue/123", resp.Sqid) | ||
| assert.Equal(t, pb.RequestStatus_CANCELLATION_ACCEPTED, resp.CurrentStatus) | ||
| } | ||
|
|
||
| func TestCancel_PublishesToQueue(t *testing.T) { | ||
| var publishedTopic string | ||
| var publishedMessage queue.Message | ||
|
|
||
| ctrl := gomock.NewController(t) | ||
| publisher := queuemock.NewMockPublisher(ctrl) | ||
| publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( | ||
| func(ctx context.Context, topic string, msg queue.Message) error { | ||
| publishedTopic = topic | ||
| publishedMessage = msg | ||
| return nil | ||
| }, | ||
| ) | ||
|
|
||
| controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, publisher, "cancel") | ||
| ctx := context.Background() | ||
|
|
||
| req := &pb.CancelRequest{Sqid: "my-queue/42"} | ||
| resp, err := controller.Cancel(ctx, req) | ||
|
|
||
| require.NoError(t, err) | ||
| assert.Equal(t, "my-queue/42", resp.Sqid) | ||
| assert.Equal(t, pb.RequestStatus_CANCELLATION_ACCEPTED, resp.CurrentStatus) | ||
|
|
||
| // Verify message was published to the cancel topic | ||
| assert.Equal(t, "cancel", publishedTopic) | ||
| assert.Equal(t, "my-queue/42", publishedMessage.ID) | ||
| assert.Equal(t, "my-queue/42", publishedMessage.PartitionKey) | ||
|
|
||
| // Verify payload is a serialized Cancel entity | ||
| cancel, err := entity.CancelFromBytes(publishedMessage.Payload) | ||
| require.NoError(t, err) | ||
| assert.Equal(t, "my-queue/42", cancel.Sqid) | ||
| } | ||
|
|
||
| func TestCancel_ReturnsErrorOnPublishFailure(t *testing.T) { | ||
| ctrl := gomock.NewController(t) | ||
| publisher := queuemock.NewMockPublisher(ctrl) | ||
| publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable")) | ||
|
|
||
| controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, publisher, "cancel") | ||
| ctx := context.Background() | ||
|
|
||
| req := &pb.CancelRequest{Sqid: "test-queue/999"} | ||
| _, err := controller.Cancel(ctx, req) | ||
|
|
||
| require.Error(t, err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mb
CancelRequest?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not critical