Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions example/server/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ import (
// GatewayServer wraps the controller and implements the gRPC service interface
type GatewayServer struct {
pb.UnimplementedSubmitQueueGatewayServer
pingController *controller.PingController
landController *controller.LandController
pingController *controller.PingController
landController *controller.LandController
statusController *controller.StatusController
}

// Ping delegates to the controller
Expand All @@ -57,6 +58,11 @@ func (s *GatewayServer) Land(ctx context.Context, req *pb.LandRequest) (*pb.Land
return s.landController.Land(ctx, req)
}

// Status delegates to the controller
func (s *GatewayServer) Status(ctx context.Context, req *pb.StatusRequest) (*pb.StatusResponse, error) {
return s.statusController.Status(ctx, req)
}

func main() {
code := 0
if err := run(); err != nil {
Expand Down Expand Up @@ -190,9 +196,11 @@ func run() error {
// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
landController := controller.NewLandController(logger.Sugar(), scope, cnt, requestLogStore, queueConfigs, registry)
statusController := controller.NewStatusController(logger.Sugar(), scope, requestLogStore)
gatewayServer := &GatewayServer{
pingController: pingController,
landController: landController,
pingController: pingController,
landController: landController,
statusController: statusController,
}

pb.RegisterSubmitQueueGatewayServer(grpcServer, gatewayServer)
Expand Down
4 changes: 4 additions & 0 deletions gateway/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ go_library(
srcs = [
"land.go",
"ping.go",
"status.go",
],
importpath = "github.com/uber/submitqueue/gateway/controller",
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//core/errs",
"//core/metrics",
"//core/request",
"//entity",
"//entity/queue",
"//extension/counter",
Expand All @@ -28,6 +30,7 @@ go_test(
srcs = [
"land_test.go",
"ping_test.go",
"status_test.go",
],
embed = [":controller"],
deps = [
Expand All @@ -39,6 +42,7 @@ go_test(
"//extension/queue/mock",
"//extension/queueconfig",
"//extension/queueconfig/mock",
"//extension/storage",
"//extension/storage/mock",
"//gateway/protopb",
"@com_github_stretchr_testify//assert",
Expand Down
97 changes: 97 additions & 0 deletions gateway/controller/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import (
"context"
"errors"
"fmt"
"time"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/core/request"
"github.com/uber/submitqueue/extension/storage"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/zap"
)

// RequestNotFoundError indicates that no request log records exist for the
// requested sqid. Either the sqid is wrong or the request has not been
// accepted yet.
type RequestNotFoundError struct {
Sqid string
}

// Error implements the error interface.
func (e *RequestNotFoundError) Error() string {
return fmt.Sprintf("request not found for sqid %q", e.Sqid)
}

// IsRequestNotFound returns true if any error in the chain is a
// *RequestNotFoundError.
func IsRequestNotFound(err error) bool {
var target *RequestNotFoundError
return errors.As(err, &target)
}

// StatusController handles request status business logic for the gateway.
type StatusController struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
requestLogStore storage.RequestLogStore
}

// NewStatusController creates a new instance of the gateway status controller.
func NewStatusController(logger *zap.SugaredLogger, scope tally.Scope, requestLogStore storage.RequestLogStore) *StatusController {
return &StatusController{
logger: logger,
metricsScope: scope,
requestLogStore: requestLogStore,
}
}

// Status returns the current reconciled status of a request identified by its sqid.
func (c *StatusController) Status(ctx context.Context, req *pb.StatusRequest) (*pb.StatusResponse, error) {
start := time.Now()
defer func() {
c.metricsScope.Timer("status_latency").Record(time.Since(start))
}()

c.metricsScope.Counter("status_count").Inc(1)

if req.Sqid == "" {
return nil, fmt.Errorf("StatusController requires the request to have a sqid specified: %w", ErrInvalidRequest)
}

state, err := request.GetCurrentStateFromRequestLog(ctx, c.requestLogStore, req.Sqid)
if err != nil {
if storage.IsNotFound(err) {
return nil, errs.NewUserError(&RequestNotFoundError{Sqid: req.Sqid})
}
return nil, fmt.Errorf("StatusController failed to get current state for sqid=%s: %w", req.Sqid, err)
}

c.logger.Debugw("request status retrieved",
"sqid", req.Sqid,
"status", string(state.Status),
)

return &pb.StatusResponse{
Status: string(state.Status),
LastError: state.LastError,
Metadata: state.Metadata,
}, nil
}
116 changes: 116 additions & 0 deletions gateway/controller/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/storage"
storagemock "github.com/uber/submitqueue/extension/storage/mock"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
)

func TestStatus_ReturnsCurrentState(t *testing.T) {
ctrl := gomock.NewController(t)

store := storagemock.NewMockRequestLogStore(ctrl)
store.EXPECT().List(gomock.Any(), "test-queue/1").Return([]entity.RequestLog{
{RequestID: "test-queue/1", TimestampMs: 100, Status: entity.RequestStatusAccepted},
{RequestID: "test-queue/1", TimestampMs: 200, Status: entity.RequestStatusValidating, LastError: "boom", Metadata: map[string]string{"k": "v"}},
}, nil)

controller := NewStatusController(zap.NewNop().Sugar(), tally.NoopScope, store)

resp, err := controller.Status(context.Background(), &pb.StatusRequest{Sqid: "test-queue/1"})

require.NoError(t, err)
assert.Equal(t, string(entity.RequestStatusValidating), resp.Status)
assert.Equal(t, "boom", resp.LastError)
assert.Equal(t, map[string]string{"k": "v"}, resp.Metadata)
}

func TestStatus_Errors(t *testing.T) {
tests := []struct {
name string
sqid string
setupStore func(*storagemock.MockRequestLogStore)
wantInvalid bool
wantNotFound bool
wantUserError bool
}{
{
name: "empty sqid is an invalid request",
sqid: "",
wantInvalid: true,
wantUserError: true,
},
{
name: "unknown sqid maps to not found",
sqid: "missing/1",
setupStore: func(s *storagemock.MockRequestLogStore) {
s.EXPECT().List(gomock.Any(), "missing/1").Return(nil, storage.ErrNotFound)
},
wantNotFound: true,
wantUserError: true,
},
{
name: "store failure propagates as infra error",
sqid: "test-queue/1",
setupStore: func(s *storagemock.MockRequestLogStore) {
s.EXPECT().List(gomock.Any(), "test-queue/1").Return(nil, fmt.Errorf("log backend down"))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
store := storagemock.NewMockRequestLogStore(ctrl)
if tt.setupStore != nil {
tt.setupStore(store)
}

controller := NewStatusController(zap.NewNop().Sugar(), tally.NoopScope, store)

_, err := controller.Status(context.Background(), &pb.StatusRequest{Sqid: tt.sqid})

require.Error(t, err)
// IsRequestNotFound is a precise type check; IsInvalidRequest matches any
// user error (see errs.userError.Is), so only assert it where it is the
// defining classification.
assert.Equal(t, tt.wantNotFound, IsRequestNotFound(err))
assert.Equal(t, tt.wantUserError, errs.IsUserError(err))
assert.False(t, errs.IsRetryable(err))
if tt.wantInvalid {
assert.True(t, IsInvalidRequest(err))
}

if tt.wantNotFound {
var typed *RequestNotFoundError
require.ErrorAs(t, err, &typed)
assert.Equal(t, tt.sqid, typed.Sqid)
}
})
}
}
30 changes: 30 additions & 0 deletions gateway/proto/gateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@ message LandResponse {
string sqid = 1;
}

// StatusRequest defines a request to look up the current status of a previously submitted request.
message StatusRequest {
// Globally unique identifier for the request, as returned by Land in the LandResponse.
string sqid = 1;
}

// StatusResponse defines the response to a status request.
// The status is eventually consistent with the request store; it might take some time to converge, typically no more than a few seconds.
message StatusResponse {
// Current customer-friendly status of the request (e.g. "accepted", "validating", "landed", "error").
// Status is a free-form string because the system may introduce new statuses without breaking existing clients.
string status = 1;
// Last error message associated with the current status. Empty string if there is no error.
string last_error = 2;
// Free-form key-value metadata associated with the current status, for display or debugging purposes. Empty if none.
map<string, string> metadata = 3;
}

// ***************
// Error messages, returned as `google.rpc.Status` messages.
// ***************
Expand All @@ -110,6 +128,14 @@ message UnrecognizedQueueError {
string queue = 2;
}

// RequestNotFoundError is returned when no request is found for the given sqid. Typically this indicates a typo in the sqid or that the request has not been accepted yet.
message RequestNotFoundError {
// Free text error message describing the error.
Error error = 1;
// The sqid that was not found.
string sqid = 2;
}

// ***************
// Service definitions.
// ***************
Expand All @@ -122,4 +148,8 @@ service SubmitQueueGateway {
// Land lands a set of code changes into a target branch, performing the necessary validations across all other changes in the queue.
// The processing is asynchronous and returns a LandResponse immediately. The land request is processed in the background.
rpc Land(LandRequest) returns (LandResponse) {}

// Status returns the current status of a previously submitted request, identified by its sqid.
// The status is eventually consistent with the request store and reconciled from the append-only request log.
rpc Status(StatusRequest) returns (StatusResponse) {}
}
Loading
Loading