Skip to content

Commit bddbe79

Browse files
feat(gateway): add Status RPC to look up request status by sqid
Adds a read-only Status RPC to the gateway that returns the current customer-friendly status of a previously submitted request, reconciled from the append-only request log via core/request.GetCurrentStateFromRequestLog. Status is a free-form string (statuses can be added without breaking clients); unknown sqids map to a typed RequestNotFoundError (user error) and empty sqids to ErrInvalidRequest. Named "Status" to match the gateway's short RPC naming (Ping, Land, Status). Changes: - gateway/proto/gateway.proto: Status RPC, StatusRequest/StatusResponse, and RequestNotFoundError messages (regenerated protopb) - gateway/controller/status.go: StatusController + tests - example/server/gateway/main.go: wire controller (reuses existing RequestLogStore) Validation: - Unit tests: bazel test //gateway/controller:controller_test (PASS) - Integration (local stack): make local-gateway-start PORT=$(docker port submitqueue-gateway-service-1 8080 | head -1 | cut -d: -f2) # method is registered grpcurl -plaintext localhost:$PORT list uber.submitqueue.gateway.SubmitQueueGateway -> Land, Ping, Status # seed a request grpcurl -plaintext -d '{"queue":"test-queue","change":{"uris":["github:///pull/123/c3a4d5e6f7890123456789abcdef0123456789ab"]}}' \ localhost:$PORT uber.submitqueue.gateway.SubmitQueueGateway/Land -> { "sqid": "test-queue/1" } # happy path grpcurl -plaintext -d '{"sqid":"test-queue/1"}' \ localhost:$PORT uber.submitqueue.gateway.SubmitQueueGateway/Status -> { "status": "accepted" } # unknown sqid grpcurl -plaintext -d '{"sqid":"test-queue/999"}' \ localhost:$PORT uber.submitqueue.gateway.SubmitQueueGateway/Status -> ERROR: request not found for sqid "test-queue/999" # empty sqid grpcurl -plaintext -d '{"sqid":""}' \ localhost:$PORT uber.submitqueue.gateway.SubmitQueueGateway/Status -> ERROR: StatusController requires the request to have a sqid specified: invalid request Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 5f690a7 commit bddbe79

8 files changed

Lines changed: 639 additions & 66 deletions

File tree

example/server/gateway/main.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ import (
4343
// GatewayServer wraps the controller and implements the gRPC service interface
4444
type GatewayServer struct {
4545
pb.UnimplementedSubmitQueueGatewayServer
46-
pingController *controller.PingController
47-
landController *controller.LandController
46+
pingController *controller.PingController
47+
landController *controller.LandController
48+
statusController *controller.StatusController
4849
}
4950

5051
// Ping delegates to the controller
@@ -57,6 +58,11 @@ func (s *GatewayServer) Land(ctx context.Context, req *pb.LandRequest) (*pb.Land
5758
return s.landController.Land(ctx, req)
5859
}
5960

61+
// Status delegates to the controller
62+
func (s *GatewayServer) Status(ctx context.Context, req *pb.StatusRequest) (*pb.StatusResponse, error) {
63+
return s.statusController.Status(ctx, req)
64+
}
65+
6066
func main() {
6167
code := 0
6268
if err := run(); err != nil {
@@ -190,9 +196,11 @@ func run() error {
190196
// Create controllers and wrap them for gRPC
191197
pingController := controller.NewPingController(logger, scope)
192198
landController := controller.NewLandController(logger.Sugar(), scope, cnt, requestLogStore, queueConfigs, registry)
199+
statusController := controller.NewStatusController(logger.Sugar(), scope, requestLogStore)
193200
gatewayServer := &GatewayServer{
194-
pingController: pingController,
195-
landController: landController,
201+
pingController: pingController,
202+
landController: landController,
203+
statusController: statusController,
196204
}
197205

198206
pb.RegisterSubmitQueueGatewayServer(grpcServer, gatewayServer)

gateway/controller/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ go_library(
55
srcs = [
66
"land.go",
77
"ping.go",
8+
"status.go",
89
],
910
importpath = "github.com/uber/submitqueue/gateway/controller",
1011
visibility = ["//visibility:public"],
1112
deps = [
1213
"//core/consumer",
1314
"//core/errs",
15+
"//core/request",
1416
"//entity",
1517
"//entity/queue",
1618
"//extension/counter",
@@ -27,6 +29,7 @@ go_test(
2729
srcs = [
2830
"land_test.go",
2931
"ping_test.go",
32+
"status_test.go",
3033
],
3134
embed = [":controller"],
3235
deps = [
@@ -38,6 +41,7 @@ go_test(
3841
"//extension/queue/mock",
3942
"//extension/queueconfig",
4043
"//extension/queueconfig/mock",
44+
"//extension/storage",
4145
"//extension/storage/mock",
4246
"//gateway/protopb",
4347
"@com_github_stretchr_testify//assert",

gateway/controller/status.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package controller
16+
17+
import (
18+
"context"
19+
"errors"
20+
"fmt"
21+
"time"
22+
23+
"github.com/uber-go/tally/v4"
24+
"github.com/uber/submitqueue/core/errs"
25+
"github.com/uber/submitqueue/core/request"
26+
"github.com/uber/submitqueue/extension/storage"
27+
pb "github.com/uber/submitqueue/gateway/protopb"
28+
"go.uber.org/zap"
29+
)
30+
31+
// RequestNotFoundError indicates that no request log records exist for the
32+
// requested sqid. Either the sqid is wrong or the request has not been
33+
// accepted yet.
34+
type RequestNotFoundError struct {
35+
Sqid string
36+
}
37+
38+
// Error implements the error interface.
39+
func (e *RequestNotFoundError) Error() string {
40+
return fmt.Sprintf("request not found for sqid %q", e.Sqid)
41+
}
42+
43+
// IsRequestNotFound returns true if any error in the chain is a
44+
// *RequestNotFoundError.
45+
func IsRequestNotFound(err error) bool {
46+
var target *RequestNotFoundError
47+
return errors.As(err, &target)
48+
}
49+
50+
// StatusController handles request status business logic for the gateway.
51+
type StatusController struct {
52+
logger *zap.SugaredLogger
53+
metricsScope tally.Scope
54+
requestLogStore storage.RequestLogStore
55+
}
56+
57+
// NewStatusController creates a new instance of the gateway status controller.
58+
func NewStatusController(logger *zap.SugaredLogger, scope tally.Scope, requestLogStore storage.RequestLogStore) *StatusController {
59+
return &StatusController{
60+
logger: logger,
61+
metricsScope: scope,
62+
requestLogStore: requestLogStore,
63+
}
64+
}
65+
66+
// Status returns the current reconciled status of a request identified by its sqid.
67+
func (c *StatusController) Status(ctx context.Context, req *pb.StatusRequest) (*pb.StatusResponse, error) {
68+
start := time.Now()
69+
defer func() {
70+
c.metricsScope.Timer("status_latency").Record(time.Since(start))
71+
}()
72+
73+
c.metricsScope.Counter("status_count").Inc(1)
74+
75+
if req.Sqid == "" {
76+
return nil, fmt.Errorf("StatusController requires the request to have a sqid specified: %w", ErrInvalidRequest)
77+
}
78+
79+
state, err := request.GetCurrentStateFromRequestLog(ctx, c.requestLogStore, req.Sqid)
80+
if err != nil {
81+
if storage.IsNotFound(err) {
82+
return nil, errs.NewUserError(&RequestNotFoundError{Sqid: req.Sqid})
83+
}
84+
return nil, fmt.Errorf("StatusController failed to get current state for sqid=%s: %w", req.Sqid, err)
85+
}
86+
87+
c.logger.Debugw("request status retrieved",
88+
"sqid", req.Sqid,
89+
"status", string(state.Status),
90+
)
91+
92+
return &pb.StatusResponse{
93+
Status: string(state.Status),
94+
LastError: state.LastError,
95+
Metadata: state.Metadata,
96+
}, nil
97+
}

gateway/controller/status_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package controller
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
"github.com/uber-go/tally/v4"
25+
"github.com/uber/submitqueue/core/errs"
26+
"github.com/uber/submitqueue/entity"
27+
"github.com/uber/submitqueue/extension/storage"
28+
storagemock "github.com/uber/submitqueue/extension/storage/mock"
29+
pb "github.com/uber/submitqueue/gateway/protopb"
30+
"go.uber.org/mock/gomock"
31+
"go.uber.org/zap"
32+
)
33+
34+
func TestStatus_ReturnsCurrentState(t *testing.T) {
35+
ctrl := gomock.NewController(t)
36+
37+
store := storagemock.NewMockRequestLogStore(ctrl)
38+
store.EXPECT().List(gomock.Any(), "test-queue/1").Return([]entity.RequestLog{
39+
{RequestID: "test-queue/1", TimestampMs: 100, Status: entity.RequestStatusAccepted},
40+
{RequestID: "test-queue/1", TimestampMs: 200, Status: entity.RequestStatusValidating, LastError: "boom", Metadata: map[string]string{"k": "v"}},
41+
}, nil)
42+
43+
controller := NewStatusController(zap.NewNop().Sugar(), tally.NoopScope, store)
44+
45+
resp, err := controller.Status(context.Background(), &pb.StatusRequest{Sqid: "test-queue/1"})
46+
47+
require.NoError(t, err)
48+
assert.Equal(t, string(entity.RequestStatusValidating), resp.Status)
49+
assert.Equal(t, "boom", resp.LastError)
50+
assert.Equal(t, map[string]string{"k": "v"}, resp.Metadata)
51+
}
52+
53+
func TestStatus_Errors(t *testing.T) {
54+
tests := []struct {
55+
name string
56+
sqid string
57+
setupStore func(*storagemock.MockRequestLogStore)
58+
wantInvalid bool
59+
wantNotFound bool
60+
wantUserError bool
61+
}{
62+
{
63+
name: "empty sqid is an invalid request",
64+
sqid: "",
65+
wantInvalid: true,
66+
wantUserError: true,
67+
},
68+
{
69+
name: "unknown sqid maps to not found",
70+
sqid: "missing/1",
71+
setupStore: func(s *storagemock.MockRequestLogStore) {
72+
s.EXPECT().List(gomock.Any(), "missing/1").Return(nil, storage.ErrNotFound)
73+
},
74+
wantNotFound: true,
75+
wantUserError: true,
76+
},
77+
{
78+
name: "store failure propagates as infra error",
79+
sqid: "test-queue/1",
80+
setupStore: func(s *storagemock.MockRequestLogStore) {
81+
s.EXPECT().List(gomock.Any(), "test-queue/1").Return(nil, fmt.Errorf("log backend down"))
82+
},
83+
},
84+
}
85+
86+
for _, tt := range tests {
87+
t.Run(tt.name, func(t *testing.T) {
88+
ctrl := gomock.NewController(t)
89+
store := storagemock.NewMockRequestLogStore(ctrl)
90+
if tt.setupStore != nil {
91+
tt.setupStore(store)
92+
}
93+
94+
controller := NewStatusController(zap.NewNop().Sugar(), tally.NoopScope, store)
95+
96+
_, err := controller.Status(context.Background(), &pb.StatusRequest{Sqid: tt.sqid})
97+
98+
require.Error(t, err)
99+
// IsRequestNotFound is a precise type check; IsInvalidRequest matches any
100+
// user error (see errs.userError.Is), so only assert it where it is the
101+
// defining classification.
102+
assert.Equal(t, tt.wantNotFound, IsRequestNotFound(err))
103+
assert.Equal(t, tt.wantUserError, errs.IsUserError(err))
104+
assert.False(t, errs.IsRetryable(err))
105+
if tt.wantInvalid {
106+
assert.True(t, IsInvalidRequest(err))
107+
}
108+
109+
if tt.wantNotFound {
110+
var typed *RequestNotFoundError
111+
require.ErrorAs(t, err, &typed)
112+
assert.Equal(t, tt.sqid, typed.Sqid)
113+
}
114+
})
115+
}
116+
}

gateway/proto/gateway.proto

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,24 @@ message LandResponse {
9292
string sqid = 1;
9393
}
9494

95+
// StatusRequest defines a request to look up the current status of a previously submitted request.
96+
message StatusRequest {
97+
// Globally unique identifier for the request, as returned by Land in the LandResponse.
98+
string sqid = 1;
99+
}
100+
101+
// StatusResponse defines the response to a status request.
102+
// The status is eventually consistent with the request store; it might take some time to converge, typically no more than a few seconds.
103+
message StatusResponse {
104+
// Current customer-friendly status of the request (e.g. "accepted", "validating", "landed", "error").
105+
// Status is a free-form string because the system may introduce new statuses without breaking existing clients.
106+
string status = 1;
107+
// Last error message associated with the current status. Empty string if there is no error.
108+
string last_error = 2;
109+
// Free-form key-value metadata associated with the current status, for display or debugging purposes. Empty if none.
110+
map<string, string> metadata = 3;
111+
}
112+
95113
// ***************
96114
// Error messages, returned as `google.rpc.Status` messages.
97115
// ***************
@@ -110,6 +128,14 @@ message UnrecognizedQueueError {
110128
string queue = 2;
111129
}
112130

131+
// RequestNotFoundError is returned when no request is found for the given sqid. Typically this indicates a typo in the sqid or that the request has not been accepted yet.
132+
message RequestNotFoundError {
133+
// Free text error message describing the error.
134+
Error error = 1;
135+
// The sqid that was not found.
136+
string sqid = 2;
137+
}
138+
113139
// ***************
114140
// Service definitions.
115141
// ***************
@@ -122,4 +148,8 @@ service SubmitQueueGateway {
122148
// Land lands a set of code changes into a target branch, performing the necessary validations across all other changes in the queue.
123149
// The processing is asynchronous and returns a LandResponse immediately. The land request is processed in the background.
124150
rpc Land(LandRequest) returns (LandResponse) {}
151+
152+
// Status returns the current status of a previously submitted request, identified by its sqid.
153+
// The status is eventually consistent with the request store and reconciled from the append-only request log.
154+
rpc Status(StatusRequest) returns (StatusResponse) {}
125155
}

0 commit comments

Comments
 (0)