From 36802e32a590dc425eb54fbf035a020f3befaf24 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Tue, 19 May 2026 13:40:18 -0700 Subject: [PATCH] feat(stovepipe): add new Ping-only service scaffold Adds a third service alongside gateway and orchestrator with proto, controller, server binary, example client, docker-compose, and an integration test. Only the Ping RPC is implemented; the wiring is in place so future RPCs can be added without rebuilding the skeleton. Stovepipe is stateless (no MySQL, no queue) and listens on :8083 by default. Makefile targets (build, proto, integration-test, local-start, local-stop, run-client) mirror the gateway/orchestrator pairs. --- BUILD.bazel | 1 + Makefile | 41 ++- example/client/stovepipe/BUILD.bazel | 19 ++ example/client/stovepipe/main.go | 78 ++++++ example/server/stovepipe/BUILD.bazel | 27 ++ example/server/stovepipe/Dockerfile | 12 + example/server/stovepipe/docker-compose.yml | 19 ++ example/server/stovepipe/main.go | 165 +++++++++++++ stovepipe/README.md | 1 + stovepipe/controller/BUILD.bazel | 26 ++ stovepipe/controller/ping.go | 72 ++++++ stovepipe/controller/ping_test.go | 104 ++++++++ stovepipe/proto/BUILD.bazel | 35 +++ stovepipe/proto/stovepipe.proto | 46 ++++ stovepipe/protopb/BUILD.bazel | 27 ++ stovepipe/protopb/stovepipe.pb.go | 223 +++++++++++++++++ stovepipe/protopb/stovepipe.pb.yarpc.go | 261 ++++++++++++++++++++ stovepipe/protopb/stovepipe_grpc.pb.go | 142 +++++++++++ test/integration/stovepipe/BUILD.bazel | 20 ++ test/integration/stovepipe/suite_test.go | 97 ++++++++ 20 files changed, 1414 insertions(+), 2 deletions(-) create mode 100644 example/client/stovepipe/BUILD.bazel create mode 100644 example/client/stovepipe/main.go create mode 100644 example/server/stovepipe/BUILD.bazel create mode 100644 example/server/stovepipe/Dockerfile create mode 100644 example/server/stovepipe/docker-compose.yml create mode 100644 example/server/stovepipe/main.go create mode 100644 stovepipe/README.md create mode 100644 stovepipe/controller/BUILD.bazel create mode 100644 stovepipe/controller/ping.go create mode 100644 stovepipe/controller/ping_test.go create mode 100644 stovepipe/proto/BUILD.bazel create mode 100644 stovepipe/proto/stovepipe.proto create mode 100644 stovepipe/protopb/BUILD.bazel create mode 100644 stovepipe/protopb/stovepipe.pb.go create mode 100644 stovepipe/protopb/stovepipe.pb.yarpc.go create mode 100644 stovepipe/protopb/stovepipe_grpc.pb.go create mode 100644 test/integration/stovepipe/BUILD.bazel create mode 100644 test/integration/stovepipe/suite_test.go diff --git a/BUILD.bazel b/BUILD.bazel index 8ed1687d..9aa85258 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -5,6 +5,7 @@ load("@gazelle//:def.bzl", "gazelle") # Resolve protobuf import ambiguities - use the actual protopb packages, not the proto aliases # gazelle:resolve go github.com/uber/submitqueue/gateway/protopb //gateway/protopb # gazelle:resolve go github.com/uber/submitqueue/orchestrator/protopb //orchestrator/protopb +# gazelle:resolve go github.com/uber/submitqueue/stovepipe/protopb //stovepipe/protopb # Export marker files for test data dependencies (used by FindRepoRoot in tests) exports_files( diff --git a/Makefile b/Makefile index 2d6bedef..0a309303 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ COMPOSE = docker-compose COMPOSE_FILE = example/server/docker-compose.yml GATEWAY_COMPOSE_FILE = example/server/gateway/docker-compose.yml ORCHESTRATOR_COMPOSE_FILE = example/server/orchestrator/docker-compose.yml +STOVEPIPE_COMPOSE_FILE = example/server/stovepipe/docker-compose.yml # Fixed project name for local manual testing (tests use unique random names) LOCAL_PROJECT = submitqueue @@ -30,7 +31,7 @@ define assert_clean fi endef -.PHONY: build build-all-linux build-gateway-linux build-orchestrator-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-consumer integration-test-extensions integration-test-gateway integration-test-orchestrator license-fix lint lint-fmt lint-license local-clean local-gateway-start local-gateway-stop local-init-schemas local-logs local-orchestrator-start local-orchestrator-stop local-ps local-restart local-start local-stop mocks proto query-deps query-targets run-client-gateway run-client-orchestrator run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help +.PHONY: build build-all-linux build-gateway-linux build-orchestrator-linux build-stovepipe-linux check-gazelle check-mocks check-tidy clean clean-proto deps e2e-test fmt gazelle integration-test integration-test-consumer integration-test-extensions integration-test-gateway integration-test-orchestrator integration-test-stovepipe license-fix lint lint-fmt lint-license local-clean local-gateway-start local-gateway-stop local-init-schemas local-logs local-orchestrator-start local-orchestrator-stop local-ps local-restart local-start local-stop local-stovepipe-start local-stovepipe-stop mocks proto query-deps query-targets run-client-gateway run-client-orchestrator run-client-stovepipe run-queue-admin test test-no-cache tidy tidy-bazel tidy-go help build: ## Build all services and examples @@ -39,7 +40,7 @@ build: ## Build all services and examples @echo "Build complete!" # Build Linux binaries required for Docker containers -build-all-linux: build-gateway-linux build-orchestrator-linux ## Build all Linux binaries for Docker +build-all-linux: build-gateway-linux build-orchestrator-linux build-stovepipe-linux ## Build all Linux binaries for Docker @echo "All Linux binaries ready for Docker" build-gateway-linux: ## Build Gateway Linux binary for Docker @@ -58,6 +59,14 @@ build-orchestrator-linux: ## Build Orchestrator Linux binary for Docker cp -f bazel-bin/example/server/orchestrator/orchestrator .docker-bin/orchestrator @echo "Orchestrator Linux binary ready at .docker-bin/orchestrator" +build-stovepipe-linux: ## Build Stovepipe Linux binary for Docker + @echo "Building Stovepipe Linux binary for Docker..." + @$(BAZEL) build --platforms=@rules_go//go/toolchain:linux_amd64 //example/server/stovepipe:stovepipe + @mkdir -p .docker-bin + @cp -f bazel-bin/example/server/stovepipe/stovepipe_/stovepipe .docker-bin/stovepipe 2>/dev/null || \ + cp -f bazel-bin/example/server/stovepipe/stovepipe .docker-bin/stovepipe + @echo "Stovepipe Linux binary ready at .docker-bin/stovepipe" + check-gazelle: ## Check BUILD.bazel files are up to date @echo "Running Gazelle to check BUILD files..." @$(BAZEL) run //:gazelle @@ -82,6 +91,7 @@ clean-proto: ## Clean generated proto files @echo "Cleaning generated proto files..." @rm -rf gateway/protopb/*.pb.go @rm -rf orchestrator/protopb/*.pb.go + @rm -rf stovepipe/protopb/*.pb.go @echo "Proto clean complete!" deps: tidy-go ## Download and tidy Go dependencies @@ -122,6 +132,10 @@ integration-test-orchestrator: build-orchestrator-linux ## Run Orchestrator inte @echo "Running Orchestrator integration tests..." @$(BAZEL) test //test/integration/orchestrator:orchestrator_test --test_output=streamed +integration-test-stovepipe: build-stovepipe-linux ## Run Stovepipe integration tests (auto-builds binary) + @echo "Running Stovepipe integration tests..." + @$(BAZEL) test //test/integration/stovepipe:stovepipe_test --test_output=streamed + license-fix: ## Add missing license headers to source files @$(BAZEL) run //tool/linter/licenseheader -- --fix @@ -243,6 +257,21 @@ local-stop: ## Stop all services (keep data) @$(COMPOSE) -f $(COMPOSE_FILE) -p $(LOCAL_PROJECT) down @echo "Services stopped. Data volumes preserved." +local-stovepipe-start: build-stovepipe-linux ## Start Stovepipe service locally (stateless, no databases) + @echo "Starting Stovepipe with docker-compose..." + @$(COMPOSE) -f $(STOVEPIPE_COMPOSE_FILE) -p $(LOCAL_PROJECT) up -d --build --wait + @echo "" + @echo "✅ Stovepipe is running!" + @echo "" + @$(COMPOSE) -f $(STOVEPIPE_COMPOSE_FILE) -p $(LOCAL_PROJECT) ps + @echo "" + @echo "Stovepipe gRPC port: $$(docker port $(LOCAL_PROJECT)-stovepipe-service-1 8080 2>/dev/null | cut -d: -f2 || echo 'unknown')" + +local-stovepipe-stop: ## Stop Stovepipe service + @echo "Stopping Stovepipe services..." + @$(COMPOSE) -f $(STOVEPIPE_COMPOSE_FILE) -p $(LOCAL_PROJECT) down + @echo "Stovepipe services stopped." + mocks: ## Generate mock files using mockgen @echo "Generating mocks..." @$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/pusher/... ./extension/scorer/... ./core/consumer/... @@ -258,6 +287,10 @@ proto: ## Generate protobuf files from .proto definitions --go-grpc_out=orchestrator/protopb --go-grpc_opt=paths=source_relative \ --yarpc-go_out=orchestrator/protopb --yarpc-go_opt=paths=source_relative \ --proto_path=orchestrator/proto orchestrator/proto/orchestrator.proto + @protoc --go_out=stovepipe/protopb --go_opt=paths=source_relative \ + --go-grpc_out=stovepipe/protopb --go-grpc_opt=paths=source_relative \ + --yarpc-go_out=stovepipe/protopb --yarpc-go_opt=paths=source_relative \ + --proto_path=stovepipe/proto stovepipe/proto/stovepipe.proto @echo "Protobuf files generated successfully!" # Bazel query helpers @@ -275,6 +308,10 @@ run-client-gateway: run-client-orchestrator: @$(BAZEL) run //example/client/orchestrator:orchestrator -- -addr $(or $(SERVER_ADDR),localhost:8082) -message "$(or $(MESSAGE),ping)" +# Run stovepipe client (connects to any running stovepipe service) +run-client-stovepipe: + @$(BAZEL) run //example/client/stovepipe:stovepipe -- -addr $(or $(SERVER_ADDR),localhost:8083) -message "$(or $(MESSAGE),ping)" + run-queue-admin: ## Run queue-admin CLI (use ARGS to pass arguments, e.g. make run-queue-admin ARGS="list-topics") @$(BAZEL) run //extension/queue/mysql/ctl -- $(ARGS) diff --git a/example/client/stovepipe/BUILD.bazel b/example/client/stovepipe/BUILD.bazel new file mode 100644 index 00000000..eed30fd1 --- /dev/null +++ b/example/client/stovepipe/BUILD.bazel @@ -0,0 +1,19 @@ +load("@rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "stovepipe_lib", + srcs = ["main.go"], + importpath = "github.com/uber/submitqueue/example/client/stovepipe", + visibility = ["//visibility:private"], + deps = [ + "//stovepipe/protopb", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//credentials/insecure", + ], +) + +go_binary( + name = "stovepipe", + embed = [":stovepipe_lib"], + visibility = ["//visibility:public"], +) diff --git a/example/client/stovepipe/main.go b/example/client/stovepipe/main.go new file mode 100644 index 00000000..69bb35d3 --- /dev/null +++ b/example/client/stovepipe/main.go @@ -0,0 +1,78 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "os" + "time" + + pb "github.com/uber/submitqueue/stovepipe/protopb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func main() { + addr := flag.String("addr", "localhost:8083", "stovepipe server address") + message := flag.String("message", "", "message to send in ping request") + timeout := flag.Duration("timeout", 5*time.Second, "request timeout") + flag.Parse() + + if err := run(*addr, *message, *timeout); err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } +} + +func run(addr, message string, timeout time.Duration) error { + // Create a gRPC connection + conn, err := grpc.NewClient( + addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + defer conn.Close() + + // Create a client + client := pb.NewSubmitQueueStovepipeClient(conn) + + // Create context with timeout + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // Make the ping request + req := &pb.PingRequest{ + Message: message, + } + + fmt.Printf("Sending ping to stovepipe at %s...\n", addr) + resp, err := client.Ping(ctx, req) + if err != nil { + return fmt.Errorf("ping failed: %w", err) + } + + // Print the response + fmt.Printf("\nResponse:\n") + fmt.Printf(" Message: %s\n", resp.Message) + fmt.Printf(" Service Name: %s\n", resp.ServiceName) + fmt.Printf(" Timestamp: %d (%s)\n", resp.Timestamp, time.Unix(resp.Timestamp, 0)) + fmt.Printf(" Hostname: %s\n", resp.Hostname) + + return nil +} diff --git a/example/server/stovepipe/BUILD.bazel b/example/server/stovepipe/BUILD.bazel new file mode 100644 index 00000000..a8c8b830 --- /dev/null +++ b/example/server/stovepipe/BUILD.bazel @@ -0,0 +1,27 @@ +load("@rules_go//go:def.bzl", "go_binary", "go_library") + +exports_files( + ["docker-compose.yml"], + visibility = ["//visibility:public"], +) + +go_library( + name = "stovepipe_lib", + srcs = ["main.go"], + importpath = "github.com/uber/submitqueue/example/server/stovepipe", + visibility = ["//visibility:private"], + deps = [ + "//stovepipe/controller", + "//stovepipe/protopb", + "@com_github_uber_go_tally_v4//:tally", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//reflection", + "@org_uber_go_zap//:zap", + ], +) + +go_binary( + name = "stovepipe", + embed = [":stovepipe_lib"], + visibility = ["//visibility:public"], +) diff --git a/example/server/stovepipe/Dockerfile b/example/server/stovepipe/Dockerfile new file mode 100644 index 00000000..355a93bd --- /dev/null +++ b/example/server/stovepipe/Dockerfile @@ -0,0 +1,12 @@ +FROM debian:bookworm-slim + +RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* +WORKDIR /root/ + +# Copy pre-built Linux binary +# Built via: make build-stovepipe-linux +COPY .docker-bin/stovepipe ./stovepipe + +EXPOSE 8080 + +CMD ["./stovepipe"] diff --git a/example/server/stovepipe/docker-compose.yml b/example/server/stovepipe/docker-compose.yml new file mode 100644 index 00000000..88ea99f4 --- /dev/null +++ b/example/server/stovepipe/docker-compose.yml @@ -0,0 +1,19 @@ +# Docker Compose for Stovepipe Manual Testing +# +# IMPORTANT: Before running docker-compose, build the Linux binary: +# make build-stovepipe-linux +# OR +# bazel build --platforms=@rules_go//go/toolchain:linux_amd64 //example/server/stovepipe +# +# Quick start: +# make local-stovepipe-start + +services: + stovepipe-service: + build: + context: ${REPO_ROOT} + dockerfile: example/server/stovepipe/Dockerfile + ports: + - "8080" # Random ephemeral port to avoid conflicts + environment: + - PORT=:8080 diff --git a/example/server/stovepipe/main.go b/example/server/stovepipe/main.go new file mode 100644 index 00000000..8cdbb9e1 --- /dev/null +++ b/example/server/stovepipe/main.go @@ -0,0 +1,165 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "errors" + "fmt" + "net" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/stovepipe/controller" + pb "github.com/uber/submitqueue/stovepipe/protopb" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +// StovepipeServer wraps the controller and implements the gRPC service interface +type StovepipeServer struct { + pb.UnimplementedSubmitQueueStovepipeServer + pingController *controller.PingController +} + +// Ping delegates to the controller +func (s *StovepipeServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) { + return s.pingController.Ping(ctx, req) +} + +func main() { + code := 0 + if err := run(); err != nil { + if errors.Is(err, context.Canceled) { + fmt.Println("Stovepipe server stopped by signal") + + // Return 143 (128 + SIGTERM) as per POSIX standard if the application receives any termination signal from the OS. Ideally we should return 128+SIGINT for SIGINT and 128+SIGTERM for SIGTERM, + // but it will require a special processing not yet available in the standard library. + code = 128 + int(syscall.SIGTERM) + } else { + fmt.Fprintf(os.Stderr, "Stovepipe server failure: %v\n", err) + // TODO: classify errors and implement a binary protocol for exit codes, so far 1 for everything + code = 1 + } + } + os.Exit(code) +} + +func run() error { + // Set up signal handling early so retry loops can respond to SIGTERM + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + // Initialize development logger (human-readable console output) + logger, err := zap.NewDevelopment() + if err != nil { + return fmt.Errorf("failed to create logger: %w", err) + } + defer logger.Sync() + + // Initialize metrics scope + scope := tally.NewTestScope("stovepipe", nil) + metricsStopCh := make(chan interface{}, 1) + metricsWgDone := sync.WaitGroup{} + metricsWgDone.Add(1) + go func() { + defer metricsWgDone.Done() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-metricsStopCh: + return + case <-ticker.C: + snapshot := scope.Snapshot() + logger.Info("metrics snapshot", + zap.Any("counters", snapshot.Counters()), + zap.Any("gauges", snapshot.Gauges()), + zap.Any("timers", snapshot.Timers()), + ) + } + } + }() + + defer func() { + close(metricsStopCh) + metricsWgDone.Wait() + }() + + // Create gRPC server + grpcServer := grpc.NewServer() + + // Create ping controller and wrap it for gRPC + pingController := controller.NewPingController(logger, scope) + stovepipeServer := &StovepipeServer{ + pingController: pingController, + } + pb.RegisterSubmitQueueStovepipeServer(grpcServer, stovepipeServer) + + // Register reflection service for debugging with grpcurl + reflection.Register(grpcServer) + + // Listen on configurable port + port := os.Getenv("PORT") + if port == "" { + port = ":8083" + } + listener, err := net.Listen("tcp", port) + if err != nil { + return fmt.Errorf("failed to listen on port %s: %w", port, err) + } + + fmt.Printf("Stovepipe gRPC server is running on %s\n", port) + fmt.Println("Press Ctrl+C to stop, or send a SIGTERM.") + + // Start server in a goroutine and wait for it to finish + serverErrCh := make(chan error, 1) + go func() { + serverErrCh <- grpcServer.Serve(listener) + }() + + // Wait for interrupt signal or server critical error + // If interruption is signaled, gracefully stop the server + // If an error happens during shutdown, return the actual error, not the context cancellation error + var serverErr error + select { + case <-ctx.Done(): + fmt.Println("Shutting down stovepipe server due to interruption signal...") + + // Set the error to the context cancellation error to be surfaced as a desired exit code by the main function + // to indicate that the server was stopped as intended + // It may be overridden by the server error if any + err = ctx.Err() + + // stop GRPC server and wait for it to exit + grpcServer.GracefulStop() + serverErr = <-serverErrCh + case serverErr = <-serverErrCh: + fmt.Println("Shutting down stovepipe server due to critical GRPC server error...") + } + + if serverErr != nil { + err = fmt.Errorf("GRPC server exited with error: %w", serverErr) + } + + return err +} diff --git a/stovepipe/README.md b/stovepipe/README.md new file mode 100644 index 00000000..fb922823 --- /dev/null +++ b/stovepipe/README.md @@ -0,0 +1 @@ +SubmitQueue Stovepipe diff --git a/stovepipe/controller/BUILD.bazel b/stovepipe/controller/BUILD.bazel new file mode 100644 index 00000000..b86190f4 --- /dev/null +++ b/stovepipe/controller/BUILD.bazel @@ -0,0 +1,26 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "controller", + srcs = ["ping.go"], + importpath = "github.com/uber/submitqueue/stovepipe/controller", + visibility = ["//visibility:public"], + deps = [ + "//stovepipe/protopb", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "controller_test", + srcs = ["ping_test.go"], + embed = [":controller"], + deps = [ + "//stovepipe/protopb", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) diff --git a/stovepipe/controller/ping.go b/stovepipe/controller/ping.go new file mode 100644 index 00000000..c85bedae --- /dev/null +++ b/stovepipe/controller/ping.go @@ -0,0 +1,72 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "os" + "time" + + "github.com/uber-go/tally/v4" + pb "github.com/uber/submitqueue/stovepipe/protopb" + "go.uber.org/zap" +) + +// PingController handles ping business logic for the stovepipe +type PingController struct { + logger *zap.Logger + metricsScope tally.Scope +} + +// NewPingController creates a new instance of the stovepipe ping controller +func NewPingController(logger *zap.Logger, scope tally.Scope) *PingController { + return &PingController{ + logger: logger, + metricsScope: scope, + } +} + +// Ping handles the ping request and returns a response +func (c *PingController) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) { + start := time.Now() + defer func() { + c.metricsScope.Timer("ping_latency").Record(time.Since(start)) + }() + + c.metricsScope.Counter("ping_requests_total").Inc(1) + + message := "pong!" + isEcho := false + if req.Message != "" { + message = "echo: " + req.Message + isEcho = true + c.metricsScope.Counter("echo_requests_total").Inc(1) + } + + hostname, _ := os.Hostname() + + c.logger.Info("ping request received", + zap.String("message", req.Message), + zap.Bool("is_echo", isEcho), + zap.String("hostname", hostname), + ) + + return &pb.PingResponse{ + Message: message, + ServiceName: "stovepipe", + Timestamp: time.Now().Unix(), + Hostname: hostname, + }, nil +} diff --git a/stovepipe/controller/ping_test.go b/stovepipe/controller/ping_test.go new file mode 100644 index 00000000..ee628c27 --- /dev/null +++ b/stovepipe/controller/ping_test.go @@ -0,0 +1,104 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + pb "github.com/uber/submitqueue/stovepipe/protopb" + "go.uber.org/zap" +) + +func TestNewPingController(t *testing.T) { + controller := NewPingController(zap.NewNop(), tally.NoopScope) + require.NotNil(t, controller) +} + +func TestPing_DefaultMessage(t *testing.T) { + controller := NewPingController(zap.NewNop(), tally.NoopScope) + ctx := context.Background() + + req := &pb.PingRequest{} + resp, err := controller.Ping(ctx, req) + + require.NoError(t, err) + assert.Equal(t, "pong!", resp.Message) +} + +func TestPing_CustomMessage(t *testing.T) { + controller := NewPingController(zap.NewNop(), tally.NoopScope) + ctx := context.Background() + + testCases := []struct { + name string + input string + expected string + }{ + {"simple message", "hello", "echo: hello"}, + {"message with spaces", "hello world", "echo: hello world"}, + {"special characters", "test!@#", "echo: test!@#"}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + req := &pb.PingRequest{Message: tc.input} + resp, err := controller.Ping(ctx, req) + + require.NoError(t, err) + assert.Equal(t, tc.expected, resp.Message) + }) + } +} + +func TestPing_ServiceName(t *testing.T) { + controller := NewPingController(zap.NewNop(), tally.NoopScope) + ctx := context.Background() + + req := &pb.PingRequest{} + resp, err := controller.Ping(ctx, req) + + require.NoError(t, err) + assert.Equal(t, "stovepipe", resp.ServiceName) +} + +func TestPing_Timestamp(t *testing.T) { + controller := NewPingController(zap.NewNop(), tally.NoopScope) + ctx := context.Background() + + before := time.Now().Unix() + req := &pb.PingRequest{} + resp, err := controller.Ping(ctx, req) + after := time.Now().Unix() + + require.NoError(t, err) + assert.GreaterOrEqual(t, resp.Timestamp, before) + assert.LessOrEqual(t, resp.Timestamp, after) +} + +func TestPing_Hostname(t *testing.T) { + controller := NewPingController(zap.NewNop(), tally.NoopScope) + ctx := context.Background() + + req := &pb.PingRequest{} + resp, err := controller.Ping(ctx, req) + + require.NoError(t, err) + assert.NotEmpty(t, resp.Hostname) +} diff --git a/stovepipe/proto/BUILD.bazel b/stovepipe/proto/BUILD.bazel new file mode 100644 index 00000000..6d93c5c3 --- /dev/null +++ b/stovepipe/proto/BUILD.bazel @@ -0,0 +1,35 @@ +load("@rules_go//go:def.bzl", "go_library") +load("@rules_go//proto:def.bzl", "go_proto_library") +load("@rules_proto//proto:defs.bzl", "proto_library") + +proto_library( + name = "stovepipepb_proto", + srcs = ["stovepipe.proto"], + visibility = ["//visibility:public"], +) + +# keep +go_proto_library( + name = "stovepipepb_go_proto", + compilers = [ + "@rules_go//proto:go_proto", + "@rules_go//proto:go_grpc_v2", + ], + importpath = "github.com/uber/submitqueue/stovepipe/proto", + proto = ":stovepipepb_proto", + visibility = ["//visibility:public"], +) + +go_library( + name = "proto", + embed = [":stovepipepb_go_proto"], + importpath = "github.com/uber/submitqueue/stovepipe/proto", + visibility = ["//visibility:public"], +) + +go_library( + name = "protopb", + embed = [":stovepipepb_go_proto"], + importpath = "github.com/uber/submitqueue/stovepipe/protopb", + visibility = ["//visibility:public"], +) diff --git a/stovepipe/proto/stovepipe.proto b/stovepipe/proto/stovepipe.proto new file mode 100644 index 00000000..3b8eec71 --- /dev/null +++ b/stovepipe/proto/stovepipe.proto @@ -0,0 +1,46 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package uber.submitqueue.stovepipe; + +option go_package = "github.com/uber/submitqueue/stovepipe/protopb"; +option java_multiple_files = true; +option java_outer_classname = "StovepipeProto"; +option java_package = "com.uber.submitqueue.stovepipe"; + +// PingRequest is the request for the Ping method +message PingRequest { + // Optional message to include in the ping + string message = 1; +} + +// PingResponse is the response for the Ping method +message PingResponse { + // The response message + string message = 1; + // The service name that handled the request + string service_name = 2; + // Timestamp of when the ping was received + int64 timestamp = 3; + // Hostname of the server that handled the request + string hostname = 4; +} + +// SubmitQueueStovepipe provides the stovepipe API +service SubmitQueueStovepipe { + // Ping returns a response indicating the service is alive + rpc Ping(PingRequest) returns (PingResponse) {} +} diff --git a/stovepipe/protopb/BUILD.bazel b/stovepipe/protopb/BUILD.bazel new file mode 100644 index 00000000..aca68418 --- /dev/null +++ b/stovepipe/protopb/BUILD.bazel @@ -0,0 +1,27 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "protopb", + srcs = [ + "stovepipe.pb.go", + "stovepipe.pb.yarpc.go", + "stovepipe_grpc.pb.go", + ], + importpath = "github.com/uber/submitqueue/stovepipe/protopb", + visibility = ["//visibility:public"], + deps = [ + "@com_github_gogo_protobuf//jsonpb", + "@com_github_gogo_protobuf//proto", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + "@org_golang_google_protobuf//reflect/protoreflect", + "@org_golang_google_protobuf//runtime/protoimpl", + "@org_uber_go_fx//:fx", + "@org_uber_go_yarpc//:yarpc", + "@org_uber_go_yarpc//api/transport", + "@org_uber_go_yarpc//api/x/restriction", + "@org_uber_go_yarpc//encoding/protobuf", + "@org_uber_go_yarpc//encoding/protobuf/reflection", + ], +) diff --git a/stovepipe/protopb/stovepipe.pb.go b/stovepipe/protopb/stovepipe.pb.go new file mode 100644 index 00000000..16b936d6 --- /dev/null +++ b/stovepipe/protopb/stovepipe.pb.go @@ -0,0 +1,223 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc v5.29.3 +// source: stovepipe.proto + +package protopb + +import ( + reflect "reflect" + sync "sync" + unsafe "unsafe" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// PingRequest is the request for the Ping method +type PingRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Optional message to include in the ping + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PingRequest) Reset() { + *x = PingRequest{} + mi := &file_stovepipe_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PingRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PingRequest) ProtoMessage() {} + +func (x *PingRequest) ProtoReflect() protoreflect.Message { + mi := &file_stovepipe_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PingRequest.ProtoReflect.Descriptor instead. +func (*PingRequest) Descriptor() ([]byte, []int) { + return file_stovepipe_proto_rawDescGZIP(), []int{0} +} + +func (x *PingRequest) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +// PingResponse is the response for the Ping method +type PingResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // The response message + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` + // The service name that handled the request + ServiceName string `protobuf:"bytes,2,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"` + // Timestamp of when the ping was received + Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + // Hostname of the server that handled the request + Hostname string `protobuf:"bytes,4,opt,name=hostname,proto3" json:"hostname,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PingResponse) Reset() { + *x = PingResponse{} + mi := &file_stovepipe_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PingResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PingResponse) ProtoMessage() {} + +func (x *PingResponse) ProtoReflect() protoreflect.Message { + mi := &file_stovepipe_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PingResponse.ProtoReflect.Descriptor instead. +func (*PingResponse) Descriptor() ([]byte, []int) { + return file_stovepipe_proto_rawDescGZIP(), []int{1} +} + +func (x *PingResponse) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +func (x *PingResponse) GetServiceName() string { + if x != nil { + return x.ServiceName + } + return "" +} + +func (x *PingResponse) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *PingResponse) GetHostname() string { + if x != nil { + return x.Hostname + } + return "" +} + +var File_stovepipe_proto protoreflect.FileDescriptor + +const file_stovepipe_proto_rawDesc = "" + + "\n" + + "\x0fstovepipe.proto\x12\x1auber.submitqueue.stovepipe\"'\n" + + "\vPingRequest\x12\x18\n" + + "\amessage\x18\x01 \x01(\tR\amessage\"\x85\x01\n" + + "\fPingResponse\x12\x18\n" + + "\amessage\x18\x01 \x01(\tR\amessage\x12!\n" + + "\fservice_name\x18\x02 \x01(\tR\vserviceName\x12\x1c\n" + + "\ttimestamp\x18\x03 \x01(\x03R\ttimestamp\x12\x1a\n" + + "\bhostname\x18\x04 \x01(\tR\bhostname2s\n" + + "\x14SubmitQueueStovepipe\x12[\n" + + "\x04Ping\x12'.uber.submitqueue.stovepipe.PingRequest\x1a(.uber.submitqueue.stovepipe.PingResponse\"\x00Ba\n" + + "\x1ecom.uber.submitqueue.stovepipeB\x0eStovepipeProtoP\x01Z-github.com/uber/submitqueue/stovepipe/protopbb\x06proto3" + +var ( + file_stovepipe_proto_rawDescOnce sync.Once + file_stovepipe_proto_rawDescData []byte +) + +func file_stovepipe_proto_rawDescGZIP() []byte { + file_stovepipe_proto_rawDescOnce.Do(func() { + file_stovepipe_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_stovepipe_proto_rawDesc), len(file_stovepipe_proto_rawDesc))) + }) + return file_stovepipe_proto_rawDescData +} + +var file_stovepipe_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_stovepipe_proto_goTypes = []any{ + (*PingRequest)(nil), // 0: uber.submitqueue.stovepipe.PingRequest + (*PingResponse)(nil), // 1: uber.submitqueue.stovepipe.PingResponse +} +var file_stovepipe_proto_depIdxs = []int32{ + 0, // 0: uber.submitqueue.stovepipe.SubmitQueueStovepipe.Ping:input_type -> uber.submitqueue.stovepipe.PingRequest + 1, // 1: uber.submitqueue.stovepipe.SubmitQueueStovepipe.Ping:output_type -> uber.submitqueue.stovepipe.PingResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_stovepipe_proto_init() } +func file_stovepipe_proto_init() { + if File_stovepipe_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_stovepipe_proto_rawDesc), len(file_stovepipe_proto_rawDesc)), + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_stovepipe_proto_goTypes, + DependencyIndexes: file_stovepipe_proto_depIdxs, + MessageInfos: file_stovepipe_proto_msgTypes, + }.Build() + File_stovepipe_proto = out.File + file_stovepipe_proto_goTypes = nil + file_stovepipe_proto_depIdxs = nil +} diff --git a/stovepipe/protopb/stovepipe.pb.yarpc.go b/stovepipe/protopb/stovepipe.pb.yarpc.go new file mode 100644 index 00000000..795ba9cf --- /dev/null +++ b/stovepipe/protopb/stovepipe.pb.yarpc.go @@ -0,0 +1,261 @@ +// Code generated by protoc-gen-yarpc-go. DO NOT EDIT. +// source: stovepipe.proto + +package protopb + +import ( + "context" + "io/ioutil" + "reflect" + + "github.com/gogo/protobuf/jsonpb" + "github.com/gogo/protobuf/proto" + "go.uber.org/fx" + "go.uber.org/yarpc" + "go.uber.org/yarpc/api/transport" + "go.uber.org/yarpc/api/x/restriction" + "go.uber.org/yarpc/encoding/protobuf" + "go.uber.org/yarpc/encoding/protobuf/reflection" +) + +var _ = ioutil.NopCloser + +// SubmitQueueStovepipeYARPCClient is the YARPC client-side interface for the SubmitQueueStovepipe service. +type SubmitQueueStovepipeYARPCClient interface { + Ping(context.Context, *PingRequest, ...yarpc.CallOption) (*PingResponse, error) +} + +func newSubmitQueueStovepipeYARPCClient(clientConfig transport.ClientConfig, anyResolver jsonpb.AnyResolver, options ...protobuf.ClientOption) SubmitQueueStovepipeYARPCClient { + return &_SubmitQueueStovepipeYARPCCaller{protobuf.NewStreamClient( + protobuf.ClientParams{ + ServiceName: "uber.submitqueue.stovepipe.SubmitQueueStovepipe", + ClientConfig: clientConfig, + AnyResolver: anyResolver, + Options: options, + }, + )} +} + +// NewSubmitQueueStovepipeYARPCClient builds a new YARPC client for the SubmitQueueStovepipe service. +func NewSubmitQueueStovepipeYARPCClient(clientConfig transport.ClientConfig, options ...protobuf.ClientOption) SubmitQueueStovepipeYARPCClient { + return newSubmitQueueStovepipeYARPCClient(clientConfig, nil, options...) +} + +// SubmitQueueStovepipeYARPCServer is the YARPC server-side interface for the SubmitQueueStovepipe service. +type SubmitQueueStovepipeYARPCServer interface { + Ping(context.Context, *PingRequest) (*PingResponse, error) +} + +type buildSubmitQueueStovepipeYARPCProceduresParams struct { + Server SubmitQueueStovepipeYARPCServer + AnyResolver jsonpb.AnyResolver +} + +func buildSubmitQueueStovepipeYARPCProcedures(params buildSubmitQueueStovepipeYARPCProceduresParams) []transport.Procedure { + handler := &_SubmitQueueStovepipeYARPCHandler{params.Server} + return protobuf.BuildProcedures( + protobuf.BuildProceduresParams{ + ServiceName: "uber.submitqueue.stovepipe.SubmitQueueStovepipe", + UnaryHandlerParams: []protobuf.BuildProceduresUnaryHandlerParams{ + { + MethodName: "Ping", + Handler: protobuf.NewUnaryHandler( + protobuf.UnaryHandlerParams{ + Handle: handler.Ping, + NewRequest: newSubmitQueueStovepipeServicePingYARPCRequest, + AnyResolver: params.AnyResolver, + }, + ), + }, + }, + OnewayHandlerParams: []protobuf.BuildProceduresOnewayHandlerParams{}, + StreamHandlerParams: []protobuf.BuildProceduresStreamHandlerParams{}, + }, + ) +} + +// BuildSubmitQueueStovepipeYARPCProcedures prepares an implementation of the SubmitQueueStovepipe service for YARPC registration. +func BuildSubmitQueueStovepipeYARPCProcedures(server SubmitQueueStovepipeYARPCServer) []transport.Procedure { + return buildSubmitQueueStovepipeYARPCProcedures(buildSubmitQueueStovepipeYARPCProceduresParams{Server: server}) +} + +// FxSubmitQueueStovepipeYARPCClientParams defines the input +// for NewFxSubmitQueueStovepipeYARPCClient. It provides the +// paramaters to get a SubmitQueueStovepipeYARPCClient in an +// Fx application. +type FxSubmitQueueStovepipeYARPCClientParams struct { + fx.In + + Provider yarpc.ClientConfig + AnyResolver jsonpb.AnyResolver `name:"yarpcfx" optional:"true"` + Restriction restriction.Checker `optional:"true"` +} + +// FxSubmitQueueStovepipeYARPCClientResult defines the output +// of NewFxSubmitQueueStovepipeYARPCClient. It provides a +// SubmitQueueStovepipeYARPCClient to an Fx application. +type FxSubmitQueueStovepipeYARPCClientResult struct { + fx.Out + + Client SubmitQueueStovepipeYARPCClient + + // We are using an fx.Out struct here instead of just returning a client + // so that we can add more values or add named versions of the client in + // the future without breaking any existing code. +} + +// NewFxSubmitQueueStovepipeYARPCClient provides a SubmitQueueStovepipeYARPCClient +// to an Fx application using the given name for routing. +// +// fx.Provide( +// protopb.NewFxSubmitQueueStovepipeYARPCClient("service-name"), +// ... +// ) +func NewFxSubmitQueueStovepipeYARPCClient(name string, options ...protobuf.ClientOption) interface{} { + return func(params FxSubmitQueueStovepipeYARPCClientParams) FxSubmitQueueStovepipeYARPCClientResult { + cc := params.Provider.ClientConfig(name) + + if params.Restriction != nil { + if namer, ok := cc.GetUnaryOutbound().(transport.Namer); ok { + if err := params.Restriction.Check(protobuf.Encoding, namer.TransportName()); err != nil { + panic(err.Error()) + } + } + } + + return FxSubmitQueueStovepipeYARPCClientResult{ + Client: newSubmitQueueStovepipeYARPCClient(cc, params.AnyResolver, options...), + } + } +} + +// FxSubmitQueueStovepipeYARPCProceduresParams defines the input +// for NewFxSubmitQueueStovepipeYARPCProcedures. It provides the +// paramaters to get SubmitQueueStovepipeYARPCServer procedures in an +// Fx application. +type FxSubmitQueueStovepipeYARPCProceduresParams struct { + fx.In + + Server SubmitQueueStovepipeYARPCServer + AnyResolver jsonpb.AnyResolver `name:"yarpcfx" optional:"true"` +} + +// FxSubmitQueueStovepipeYARPCProceduresResult defines the output +// of NewFxSubmitQueueStovepipeYARPCProcedures. It provides +// SubmitQueueStovepipeYARPCServer procedures to an Fx application. +// +// The procedures are provided to the "yarpcfx" value group. +// Dig 1.2 or newer must be used for this feature to work. +type FxSubmitQueueStovepipeYARPCProceduresResult struct { + fx.Out + + Procedures []transport.Procedure `group:"yarpcfx"` + ReflectionMeta reflection.ServerMeta `group:"yarpcfx"` +} + +// NewFxSubmitQueueStovepipeYARPCProcedures provides SubmitQueueStovepipeYARPCServer procedures to an Fx application. +// It expects a SubmitQueueStovepipeYARPCServer to be present in the container. +// +// fx.Provide( +// protopb.NewFxSubmitQueueStovepipeYARPCProcedures(), +// ... +// ) +func NewFxSubmitQueueStovepipeYARPCProcedures() interface{} { + return func(params FxSubmitQueueStovepipeYARPCProceduresParams) FxSubmitQueueStovepipeYARPCProceduresResult { + return FxSubmitQueueStovepipeYARPCProceduresResult{ + Procedures: buildSubmitQueueStovepipeYARPCProcedures(buildSubmitQueueStovepipeYARPCProceduresParams{ + Server: params.Server, + AnyResolver: params.AnyResolver, + }), + ReflectionMeta: SubmitQueueStovepipeReflectionMeta, + } + } +} + +// SubmitQueueStovepipeReflectionMeta is the reflection server metadata +// required for using the gRPC reflection protocol with YARPC. +// +// See https://github.com/grpc/grpc/blob/master/doc/server-reflection.md. +var SubmitQueueStovepipeReflectionMeta = reflection.ServerMeta{ + ServiceName: "uber.submitqueue.stovepipe.SubmitQueueStovepipe", + FileDescriptors: yarpcFileDescriptorClosurefabdb6b3c0b09022, +} + +type _SubmitQueueStovepipeYARPCCaller struct { + streamClient protobuf.StreamClient +} + +func (c *_SubmitQueueStovepipeYARPCCaller) Ping(ctx context.Context, request *PingRequest, options ...yarpc.CallOption) (*PingResponse, error) { + responseMessage, err := c.streamClient.Call(ctx, "Ping", request, newSubmitQueueStovepipeServicePingYARPCResponse, options...) + if responseMessage == nil { + return nil, err + } + response, ok := responseMessage.(*PingResponse) + if !ok { + return nil, protobuf.CastError(emptySubmitQueueStovepipeServicePingYARPCResponse, responseMessage) + } + return response, err +} + +type _SubmitQueueStovepipeYARPCHandler struct { + server SubmitQueueStovepipeYARPCServer +} + +func (h *_SubmitQueueStovepipeYARPCHandler) Ping(ctx context.Context, requestMessage proto.Message) (proto.Message, error) { + var request *PingRequest + var ok bool + if requestMessage != nil { + request, ok = requestMessage.(*PingRequest) + if !ok { + return nil, protobuf.CastError(emptySubmitQueueStovepipeServicePingYARPCRequest, requestMessage) + } + } + response, err := h.server.Ping(ctx, request) + if response == nil { + return nil, err + } + return response, err +} + +func newSubmitQueueStovepipeServicePingYARPCRequest() proto.Message { + return &PingRequest{} +} + +func newSubmitQueueStovepipeServicePingYARPCResponse() proto.Message { + return &PingResponse{} +} + +var ( + emptySubmitQueueStovepipeServicePingYARPCRequest = &PingRequest{} + emptySubmitQueueStovepipeServicePingYARPCResponse = &PingResponse{} +) + +var yarpcFileDescriptorClosurefabdb6b3c0b09022 = [][]byte{ + // stovepipe.proto + []byte{ + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x91, 0xb1, 0x4e, 0xc3, 0x30, + 0x10, 0x86, 0x31, 0xad, 0x80, 0x5e, 0x2b, 0x90, 0x2c, 0x86, 0x28, 0x42, 0xa8, 0x64, 0x69, 0x16, + 0x1c, 0x09, 0xde, 0xa0, 0x0f, 0x80, 0x42, 0xba, 0xc1, 0x80, 0xec, 0xe8, 0x94, 0x7a, 0x70, 0xec, + 0xe6, 0xec, 0xbe, 0x01, 0xef, 0x8d, 0xe2, 0xd2, 0xd0, 0x25, 0x74, 0xf3, 0x9d, 0xff, 0x4f, 0xfa, + 0x3f, 0x1b, 0xee, 0xc8, 0xdb, 0x3d, 0x3a, 0xed, 0x50, 0xb8, 0xce, 0x7a, 0xcb, 0xd3, 0xa0, 0xb0, + 0x13, 0x14, 0x94, 0xd1, 0x7e, 0x17, 0x30, 0xa0, 0x18, 0x12, 0xd9, 0x0a, 0xe6, 0xa5, 0x6e, 0x9b, + 0x0a, 0x77, 0x01, 0xc9, 0xf3, 0x04, 0xae, 0x0d, 0x12, 0xc9, 0x06, 0x13, 0xb6, 0x64, 0xf9, 0xac, + 0x3a, 0x8e, 0xd9, 0x37, 0x83, 0xc5, 0x21, 0x49, 0xce, 0xb6, 0x84, 0xe3, 0x51, 0xfe, 0x04, 0x0b, + 0xc2, 0x6e, 0xaf, 0x6b, 0xfc, 0x6a, 0xa5, 0xc1, 0xe4, 0x32, 0x5e, 0xcf, 0x7f, 0x77, 0x6f, 0xd2, + 0x20, 0x7f, 0x80, 0x99, 0xd7, 0x06, 0xc9, 0x4b, 0xe3, 0x92, 0xc9, 0x92, 0xe5, 0x93, 0xea, 0x6f, + 0xc1, 0x53, 0xb8, 0xd9, 0x5a, 0xf2, 0x11, 0x9e, 0x46, 0x78, 0x98, 0x5f, 0x08, 0xee, 0x37, 0xd1, + 0xe4, 0xbd, 0x37, 0xd9, 0x1c, 0x45, 0xf8, 0x27, 0x4c, 0xfb, 0x7a, 0x7c, 0x25, 0xc6, 0x6d, 0xc5, + 0x89, 0x6a, 0x9a, 0x9f, 0x0f, 0x1e, 0x4c, 0xb3, 0x8b, 0xb5, 0x84, 0xc7, 0xda, 0x9a, 0x7f, 0x80, + 0xf5, 0xed, 0xd0, 0xa4, 0xec, 0xdf, 0xbc, 0x64, 0x1f, 0xcf, 0x8d, 0xf6, 0xdb, 0xa0, 0x44, 0x6d, + 0x4d, 0xd1, 0x83, 0xc5, 0x09, 0x58, 0x0c, 0x60, 0x11, 0xbf, 0xc8, 0x29, 0x75, 0x15, 0x0f, 0xaf, + 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x5c, 0xd3, 0x2a, 0x7a, 0xbe, 0x01, 0x00, 0x00, + }, +} + +func init() { + yarpc.RegisterClientBuilder( + func(clientConfig transport.ClientConfig, structField reflect.StructField) SubmitQueueStovepipeYARPCClient { + return NewSubmitQueueStovepipeYARPCClient(clientConfig, protobuf.ClientBuilderOptions(clientConfig, structField)...) + }, + ) +} diff --git a/stovepipe/protopb/stovepipe_grpc.pb.go b/stovepipe/protopb/stovepipe_grpc.pb.go new file mode 100644 index 00000000..2bbce1bc --- /dev/null +++ b/stovepipe/protopb/stovepipe_grpc.pb.go @@ -0,0 +1,142 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v5.29.3 +// source: stovepipe.proto + +package protopb + +import ( + context "context" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + SubmitQueueStovepipe_Ping_FullMethodName = "/uber.submitqueue.stovepipe.SubmitQueueStovepipe/Ping" +) + +// SubmitQueueStovepipeClient is the client API for SubmitQueueStovepipe service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// SubmitQueueStovepipe provides the stovepipe API +type SubmitQueueStovepipeClient interface { + // Ping returns a response indicating the service is alive + Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) +} + +type submitQueueStovepipeClient struct { + cc grpc.ClientConnInterface +} + +func NewSubmitQueueStovepipeClient(cc grpc.ClientConnInterface) SubmitQueueStovepipeClient { + return &submitQueueStovepipeClient{cc} +} + +func (c *submitQueueStovepipeClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(PingResponse) + err := c.cc.Invoke(ctx, SubmitQueueStovepipe_Ping_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// SubmitQueueStovepipeServer is the server API for SubmitQueueStovepipe service. +// All implementations must embed UnimplementedSubmitQueueStovepipeServer +// for forward compatibility. +// +// SubmitQueueStovepipe provides the stovepipe API +type SubmitQueueStovepipeServer interface { + // Ping returns a response indicating the service is alive + Ping(context.Context, *PingRequest) (*PingResponse, error) + mustEmbedUnimplementedSubmitQueueStovepipeServer() +} + +// UnimplementedSubmitQueueStovepipeServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedSubmitQueueStovepipeServer struct{} + +func (UnimplementedSubmitQueueStovepipeServer) Ping(context.Context, *PingRequest) (*PingResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") +} +func (UnimplementedSubmitQueueStovepipeServer) mustEmbedUnimplementedSubmitQueueStovepipeServer() {} +func (UnimplementedSubmitQueueStovepipeServer) testEmbeddedByValue() {} + +// UnsafeSubmitQueueStovepipeServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SubmitQueueStovepipeServer will +// result in compilation errors. +type UnsafeSubmitQueueStovepipeServer interface { + mustEmbedUnimplementedSubmitQueueStovepipeServer() +} + +func RegisterSubmitQueueStovepipeServer(s grpc.ServiceRegistrar, srv SubmitQueueStovepipeServer) { + // If the following call pancis, it indicates UnimplementedSubmitQueueStovepipeServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&SubmitQueueStovepipe_ServiceDesc, srv) +} + +func _SubmitQueueStovepipe_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PingRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SubmitQueueStovepipeServer).Ping(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: SubmitQueueStovepipe_Ping_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SubmitQueueStovepipeServer).Ping(ctx, req.(*PingRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// SubmitQueueStovepipe_ServiceDesc is the grpc.ServiceDesc for SubmitQueueStovepipe service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var SubmitQueueStovepipe_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "uber.submitqueue.stovepipe.SubmitQueueStovepipe", + HandlerType: (*SubmitQueueStovepipeServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Ping", + Handler: _SubmitQueueStovepipe_Ping_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "stovepipe.proto", +} diff --git a/test/integration/stovepipe/BUILD.bazel b/test/integration/stovepipe/BUILD.bazel new file mode 100644 index 00000000..3121c0b0 --- /dev/null +++ b/test/integration/stovepipe/BUILD.bazel @@ -0,0 +1,20 @@ +load("@rules_go//go:def.bzl", "go_test") + +go_test( + name = "stovepipe_test", + srcs = ["suite_test.go"], + data = [ + "//:MODULE.bazel", + "//:go.mod", + "//example/server/stovepipe:docker-compose.yml", + ], + tags = ["integration"], + deps = [ + "//stovepipe/protopb", + "//test/testutil", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_stretchr_testify//suite", + "@org_golang_google_grpc//:grpc", + ], +) diff --git a/test/integration/stovepipe/suite_test.go b/test/integration/stovepipe/suite_test.go new file mode 100644 index 00000000..262f2502 --- /dev/null +++ b/test/integration/stovepipe/suite_test.go @@ -0,0 +1,97 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stovepipe + +// Stovepipe Integration Tests +// +// These tests use docker-compose from example/server/stovepipe/docker-compose.yml +// which requires a pre-built Linux binary. +// +// Run with make target (builds binary + runs test): +// make integration-test-stovepipe + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + pb "github.com/uber/submitqueue/stovepipe/protopb" + "github.com/uber/submitqueue/test/testutil" + "google.golang.org/grpc" +) + +type StovepipeIntegrationSuite struct { + suite.Suite + ctx context.Context + log *testutil.TestLogger + stack *testutil.ComposeStack + client pb.SubmitQueueStovepipeClient +} + +func TestStovepipeIntegration(t *testing.T) { + suite.Run(t, new(StovepipeIntegrationSuite)) +} + +func (s *StovepipeIntegrationSuite) SetupSuite() { + t := s.T() + s.ctx = context.Background() + s.log = testutil.NewTestLogger(t) + + s.log.Logf("Starting Stovepipe integration test suite using docker-compose") + + // Set REPO_ROOT for docker-compose volume mounts and build context + repoRoot := testutil.FindRepoRoot(t) + t.Setenv("REPO_ROOT", repoRoot) + + // Use docker-compose from example/server/stovepipe + // NOTE: Assumes Linux binary is pre-built via make target + composeFile := filepath.Join(repoRoot, "example/server/stovepipe/docker-compose.yml") + s.stack = testutil.NewComposeStack(t, s.log, s.ctx, composeFile, "svc-stovepipe") + + // Start the compose stack (Stovepipe only — stateless service, no DBs) + err := s.stack.Up() + require.NoError(t, err, "failed to start compose stack") + + s.log.Logf("Compose stack started successfully") + + // Connect to Stovepipe gRPC service + var conn *grpc.ClientConn + conn, err = s.stack.ConnectGRPC("stovepipe-service", 8080) + require.NoError(t, err, "failed to connect to stovepipe") + s.client = pb.NewSubmitQueueStovepipeClient(conn) + + s.log.Logf("Stovepipe integration test suite ready") +} + +func (s *StovepipeIntegrationSuite) TearDownSuite() { + s.log.Logf("Tearing down Stovepipe integration test suite") + // Cleanup handled automatically by testutil.ComposeStack +} + +// TestPingAPI tests the Stovepipe Ping API +func (s *StovepipeIntegrationSuite) TestPingAPI() { + t := s.T() + + resp, err := s.client.Ping(s.ctx, &pb.PingRequest{Message: "integration test"}) + require.NoError(t, err, "Stovepipe Ping failed") + assert.Equal(t, "stovepipe", resp.ServiceName) + assert.NotEmpty(t, resp.Message) + assert.NotZero(t, resp.Timestamp) + + s.log.Logf("Stovepipe Ping test passed: %s", resp.Message) +}