Skip to content

Commit 65738db

Browse files
committed
address comments
1 parent ff0ef20 commit 65738db

9 files changed

Lines changed: 352 additions & 78 deletions

File tree

entity/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ go_library(
66
"batch.go",
77
"batch_dependent.go",
88
"build.go",
9+
"cancel.go",
910
"change_provider.go",
1011
"queue_config.go",
1112
"request.go",
@@ -20,6 +21,7 @@ go_test(
2021
srcs = [
2122
"batch_test.go",
2223
"build_test.go",
24+
"cancel_test.go",
2325
"request_test.go",
2426
],
2527
embed = [":entity"],

entity/cancel.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package entity
2+
3+
import "encoding/json"
4+
5+
// Cancel represents a request to cancel a previously submitted land request.
6+
// The object is immutable after creation.
7+
type Cancel struct {
8+
// Sqid is the globally unique identifier of the land request to cancel.
9+
Sqid string `json:"sqid"`
10+
}
11+
12+
// ToBytes serializes the Cancel to JSON bytes for queue message payload.
13+
func (c Cancel) ToBytes() ([]byte, error) {
14+
return json.Marshal(c)
15+
}
16+
17+
// CancelFromBytes deserializes a Cancel from JSON bytes.
18+
func CancelFromBytes(data []byte) (Cancel, error) {
19+
var c Cancel
20+
err := json.Unmarshal(data, &c)
21+
return c, err
22+
}

entity/cancel_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package entity
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestCancel_ToBytes(t *testing.T) {
11+
cancel := Cancel{
12+
Sqid: "test-queue/123",
13+
}
14+
15+
data, err := cancel.ToBytes()
16+
require.NoError(t, err)
17+
assert.Contains(t, string(data), "test-queue/123")
18+
}
19+
20+
func TestCancelFromBytes(t *testing.T) {
21+
original := Cancel{
22+
Sqid: "my-queue/999",
23+
}
24+
25+
data, err := original.ToBytes()
26+
require.NoError(t, err)
27+
28+
deserialized, err := CancelFromBytes(data)
29+
require.NoError(t, err)
30+
assert.Equal(t, original, deserialized)
31+
}
32+
33+
func TestCancelFromBytes_InvalidJSON(t *testing.T) {
34+
_, err := CancelFromBytes([]byte(`{"invalid": json"}`))
35+
assert.Error(t, err)
36+
}
37+
38+
func TestCancel_SerializationRoundTrip(t *testing.T) {
39+
tests := []struct {
40+
name string
41+
cancel Cancel
42+
}{
43+
{
44+
name: "standard sqid",
45+
cancel: Cancel{Sqid: "queue1/100"},
46+
},
47+
{
48+
name: "long sqid",
49+
cancel: Cancel{Sqid: "my-long-queue-name/999999"},
50+
},
51+
}
52+
53+
for _, tt := range tests {
54+
t.Run(tt.name, func(t *testing.T) {
55+
data, err := tt.cancel.ToBytes()
56+
require.NoError(t, err)
57+
58+
deserialized, err := CancelFromBytes(data)
59+
require.NoError(t, err)
60+
assert.Equal(t, tt.cancel, deserialized)
61+
})
62+
}
63+
}

gateway/controller/cancel.go

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/uber-go/tally/v4"
88
"github.com/uber/submitqueue/core/metrics"
9+
"github.com/uber/submitqueue/entity"
910
"github.com/uber/submitqueue/entity/queue"
1011
extqueue "github.com/uber/submitqueue/extension/queue"
1112
pb "github.com/uber/submitqueue/gateway/protopb"
@@ -40,9 +41,25 @@ func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (r
4041

4142
sqid := req.Sqid
4243

43-
// TODO: Insert the request to the event store
44+
if sqid == "" {
45+
return &pb.CancelResponse{
46+
Error: &pb.Error{Message: "sqid is required"},
47+
}, nil
48+
}
49+
50+
// TODO -
51+
// Look up event store to see if sqid exists -
52+
// - if found and request is already in a terminal state; return cancellation failed with appropriate
53+
// error message.
54+
// - if found and request is not in a terminal state; publish a cancel entity onto the cancel topic
55+
// to be picked up by the orchestrator.
56+
// - if not found; publish a cancel entity onto the cancel topic
57+
// to be picked up by the orchestrator.
58+
cancel := entity.Cancel{
59+
Sqid: sqid,
60+
}
4461

45-
if err := c.publishToQueue(ctx, sqid); err != nil {
62+
if err := c.publishToQueue(ctx, cancel); err != nil {
4663
c.logger.Errorw("failed to publish cancel request to queue",
4764
"sqid", sqid,
4865
"error", err,
@@ -56,19 +73,23 @@ func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (r
5673
)
5774

5875
return &pb.CancelResponse{
59-
Sqid: sqid,
76+
Sqid: sqid,
77+
CurrentStatus: pb.RequestStatus_CANCELLATION_ACCEPTED,
6078
}, nil
6179
}
6280

63-
// publishToQueue publishes a cancel request to the cancel queue for async processing.
64-
func (c *CancelController) publishToQueue(ctx context.Context, sqid string) error {
65-
payload := []byte(sqid)
81+
// publishToQueue publishes a cancel entity to the cancel queue for async processing.
82+
func (c *CancelController) publishToQueue(ctx context.Context, cancel entity.Cancel) error {
83+
payload, err := cancel.ToBytes()
84+
if err != nil {
85+
return fmt.Errorf("failed to serialize cancel entity: %w", err)
86+
}
6687

6788
// Create queue message
6889
// - Message ID: sqid for idempotency
69-
// - Payload: sqid as bytes
70-
// - Partition key: sqid (ensures ordering per request)
71-
msg := queue.NewMessage(sqid, payload, sqid, nil)
90+
// - Payload: serialized Cancel entity
91+
// - Partition key: empty (no inherent ordering for cancel requests)
92+
msg := queue.NewMessage(cancel.Sqid, payload, "", nil)
7293

7394
if err := c.publisher.Publish(ctx, c.topic, msg); err != nil {
7495
return fmt.Errorf("failed to publish message: %w", err)

gateway/controller/cancel_test.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/stretchr/testify/assert"
99
"github.com/stretchr/testify/require"
1010
"github.com/uber-go/tally/v4"
11+
"github.com/uber/submitqueue/entity"
1112
"github.com/uber/submitqueue/entity/queue"
1213
queuemock "github.com/uber/submitqueue/extension/queue/mock"
1314
pb "github.com/uber/submitqueue/gateway/protopb"
@@ -21,6 +22,21 @@ func TestNewCancelController(t *testing.T) {
2122
require.NotNil(t, controller)
2223
}
2324

25+
func TestCancel_ReturnsErrorOnEmptySqid(t *testing.T) {
26+
ctrl := gomock.NewController(t)
27+
controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, noopPublisher(ctrl), "cancel")
28+
ctx := context.Background()
29+
30+
req := &pb.CancelRequest{Sqid: ""}
31+
resp, err := controller.Cancel(ctx, req)
32+
33+
require.NoError(t, err)
34+
assert.Empty(t, resp.Sqid)
35+
assert.Equal(t, pb.RequestStatus_UNKNOWN, resp.CurrentStatus)
36+
require.NotNil(t, resp.Error)
37+
assert.Equal(t, "sqid is required", resp.Error.Message)
38+
}
39+
2440
func TestCancel_ReturnsSqid(t *testing.T) {
2541
ctrl := gomock.NewController(t)
2642
controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, noopPublisher(ctrl), "cancel")
@@ -31,6 +47,7 @@ func TestCancel_ReturnsSqid(t *testing.T) {
3147

3248
require.NoError(t, err)
3349
assert.Equal(t, "test-queue/123", resp.Sqid)
50+
assert.Equal(t, pb.RequestStatus_CANCELLATION_ACCEPTED, resp.CurrentStatus)
3451
}
3552

3653
func TestCancel_PublishesToQueue(t *testing.T) {
@@ -55,12 +72,17 @@ func TestCancel_PublishesToQueue(t *testing.T) {
5572

5673
require.NoError(t, err)
5774
assert.Equal(t, "my-queue/42", resp.Sqid)
75+
assert.Equal(t, pb.RequestStatus_CANCELLATION_ACCEPTED, resp.CurrentStatus)
5876

5977
// Verify message was published to the cancel topic
6078
assert.Equal(t, "cancel", publishedTopic)
6179
assert.Equal(t, "my-queue/42", publishedMessage.ID)
6280
assert.Equal(t, "my-queue/42", publishedMessage.PartitionKey)
63-
assert.Equal(t, []byte("my-queue/42"), publishedMessage.Payload)
81+
82+
// Verify payload is a serialized Cancel entity
83+
cancel, err := entity.CancelFromBytes(publishedMessage.Payload)
84+
require.NoError(t, err)
85+
assert.Equal(t, "my-queue/42", cancel.Sqid)
6486
}
6587

6688
func TestCancel_ReturnsErrorOnPublishFailure(t *testing.T) {

gateway/proto/gateway.proto

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,32 @@ message CancelRequest {
7979
string sqid = 1;
8080
}
8181

82+
// RequestStatus defines the possible states of a land request.
83+
enum RequestStatus {
84+
// Unknown status.
85+
UNKNOWN = 0;
86+
// Initial state of a land request. Confirmed by the system but processing has not started.
87+
NEW = 1;
88+
// The request has been validated (duplicate check, merge check etc.) successfully.
89+
VALIDATED = 2;
90+
// The request is being processed.
91+
PROCESSING = 3;
92+
// The request has been successfully processed and landed. Terminal state.
93+
LANDED = 4;
94+
// The cancellation request has been accepted and is being processed.
95+
CANCELLATION_ACCEPTED = 5;
96+
// The request has been cancelled. Terminal state.
97+
CANCELLED = 6;
98+
}
99+
82100
// CancelResponse defines the response to a cancel request.
83101
message CancelResponse {
84-
// Globally unique identifier for the land request that was cancelled.
102+
// Globally unique identifier for the land request that was requested to be cancelled.
85103
string sqid = 1;
104+
// Current status of the land request at the time of cancellation request.
105+
RequestStatus current_status = 2;
106+
// Optional error describing why the cancellation could not be completed.
107+
Error error = 3;
86108
}
87109

88110
// ***************
@@ -116,7 +138,11 @@ service SubmitQueueGateway {
116138
// The processing is asynchronous and returns a LandResponse immediately. The land request is processed in the background.
117139
rpc Land(LandRequest) returns (LandResponse) {}
118140

119-
// Cancel cancels a previously submitted land request. The cancellation is asynchronous and
120-
// published to a queue for processing. No validation is performed on whether the request exists.
141+
// Cancel requests cancellation of a previously submitted land request.
142+
// The cancellation is asynchronous and published to a queue for
143+
// processing. Note that cancellation is best effort and not guaranteed.
144+
// To check for the status of the request, use Status API (TODO). A
145+
// "Cancelled" status indicates that the requested cancellation was
146+
// successful.
121147
rpc Cancel(CancelRequest) returns (CancelResponse) {}
122148
}

0 commit comments

Comments
 (0)