From 31ce3a23e67c8799ff5f17ebf8ddc52436a6a0ab Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Sat, 21 Feb 2026 22:47:03 -0800 Subject: [PATCH] feat: add orchestrator pipeline, TopicRegistry, and CI improvements - Add TopicRegistry to centralize queue topic configuration and decouple controllers from subscription details - Refactor consumer to accept TopicRegistry instead of raw queue + subscriber name - Add request controller to orchestrator pipeline with publisher for forwarding to next stage - Generate gomock mocks for consumer, queue, publisher, and subscriber interfaces - Simplify server startup by removing retry loops (rely on Docker Compose healthchecks via --wait) - Auto-tail container logs to stderr during integration/e2e tests - Fix CI Bazel caching: use disk-cache + repo-cache from .bazelrc, per-job cache keys to avoid save races - Stream test output in real-time (--test_output=streamed) - Remove redundant .github/actions/logs (logs now stream automatically) - Consolidate PROJECT_STRUCTURE.md into CLAUDE.md Co-Authored-By: Claude Opus 4.6 --- .bazelrc | 1 + .github/actions/logs/action.yml | 19 - .github/actions/run-bazel-test/action.yml | 7 +- .github/actions/setup/action.yml | 21 +- .github/workflows/ci.yml | 6 - CLAUDE.md | 263 +++---- Makefile | 10 +- core/consumer/BUILD.bazel | 5 + core/consumer/consumer.go | 73 +- core/consumer/consumer_test.go | 702 +++++++----------- core/consumer/controller.go | 9 +- core/consumer/mock/BUILD.bazel | 13 + core/consumer/mock/controller_mock.go | 207 ++++++ core/consumer/registry.go | 80 ++ core/consumer/registry_test.go | 194 +++++ doc/PROJECT_STRUCTURE.md | 178 ----- doc/howto/TESTING.md | 22 +- example/server/gateway/BUILD.bazel | 3 +- example/server/gateway/main.go | 112 +-- example/server/orchestrator/main.go | 137 ++-- extension/queue/mock/BUILD.bazel | 8 +- extension/queue/mock/publisher.go | 70 ++ extension/queue/mock/queue.go | 83 +++ extension/queue/mock/subscriber.go | 71 ++ extension/queue/publisher.go | 2 + extension/queue/queue.go | 2 + extension/queue/sql/subscriber_test.go | 2 +- extension/queue/subscriber.go | 2 + extension/queue/subscription_config.go | 6 +- extension/queue/subscription_config_test.go | 23 +- extension/storage/mysql/storage.go | 36 +- gateway/controller/land.go | 9 +- gateway/controller/land_test.go | 24 +- orchestrator/controller/request/BUILD.bazel | 3 +- orchestrator/controller/request/request.go | 92 ++- .../controller/request/request_test.go | 131 ++-- .../extension/queue/sql/queue_test.go | 40 +- .../extension/storage/mysql/storage_test.go | 10 +- test/testutil/compose.go | 46 +- 39 files changed, 1498 insertions(+), 1224 deletions(-) delete mode 100644 .github/actions/logs/action.yml create mode 100644 core/consumer/mock/BUILD.bazel create mode 100644 core/consumer/mock/controller_mock.go create mode 100644 core/consumer/registry.go create mode 100644 core/consumer/registry_test.go delete mode 100644 doc/PROJECT_STRUCTURE.md create mode 100644 extension/queue/mock/publisher.go create mode 100644 extension/queue/mock/queue.go create mode 100644 extension/queue/mock/subscriber.go diff --git a/.bazelrc b/.bazelrc index 75e4e964..d5c30b39 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1 +1,2 @@ common --disk_cache=~/.cache/bazel-disk-cache +common --repository_cache=~/.cache/bazel-repo-cache diff --git a/.github/actions/logs/action.yml b/.github/actions/logs/action.yml deleted file mode 100644 index acf6d8d2..00000000 --- a/.github/actions/logs/action.yml +++ /dev/null @@ -1,19 +0,0 @@ -name: Logs -description: Display container logs on test failure - -runs: - using: composite - steps: - - name: Display container logs on failure - if: failure() - shell: bash - run: | - echo "=== Listing all Docker containers ===" - docker ps -a --filter "name=sq-test-" || true - echo "" - echo "=== Dumping container logs ===" - for container in $(docker ps -a --filter "name=sq-test-" --format "{{.Names}}"); do - echo ">>> Logs for $container <<<" - docker logs "$container" 2>&1 || true - echo "" - done diff --git a/.github/actions/run-bazel-test/action.yml b/.github/actions/run-bazel-test/action.yml index f6efd4ef..2a3cb6f7 100644 --- a/.github/actions/run-bazel-test/action.yml +++ b/.github/actions/run-bazel-test/action.yml @@ -1,5 +1,5 @@ name: Run Bazel Test -description: Run Bazel test and show logs on failure +description: Run a Bazel test target inputs: target: @@ -11,7 +11,4 @@ runs: steps: - name: Run Bazel test shell: bash - run: ./tool/bazel test ${{ inputs.target }} --test_output=errors - - - uses: ./.github/actions/logs - if: always() + run: ./tool/bazel test ${{ inputs.target }} --test_output=streamed diff --git a/.github/actions/setup/action.yml b/.github/actions/setup/action.yml index c2130e70..9cfe01db 100644 --- a/.github/actions/setup/action.yml +++ b/.github/actions/setup/action.yml @@ -1,5 +1,17 @@ name: Setup -description: Setup Bazel cache for CI jobs +description: Setup Bazel caches for CI jobs + +# Caches three directories that match the paths configured in .bazelrc: +# - disk-cache: compiled build outputs (biggest speedup — avoids recompilation) +# - repo-cache: downloaded external dependencies (avoids re-downloading Go modules, protobuf, etc.) +# - bazelisk: the Bazel binary itself +# +# Cache key includes github.job so each CI job saves its own cache entry +# (otherwise concurrent jobs race on the same key and only one wins). +# github.run_id ensures the key never matches exactly, forcing actions/cache +# to always save — so the disk cache accumulates entries across runs. +# The restore-keys fallback restores from the most recent prior run of the +# same job (preferred), then same deps, then any. runs: using: composite @@ -8,8 +20,11 @@ runs: uses: actions/cache@v4 with: path: | - ~/.cache/bazel + ~/.cache/bazel-disk-cache + ~/.cache/bazel-repo-cache ~/.cache/bazelisk - key: bazel-${{ runner.os }}-${{ hashFiles('MODULE.bazel', 'go.mod', '.bazelversion') }} + key: bazel-${{ runner.os }}-${{ github.job }}-${{ hashFiles('MODULE.bazel', 'go.mod', 'go.sum', '.bazelversion', '.bazelrc') }}-${{ github.run_id }} restore-keys: | + bazel-${{ runner.os }}-${{ github.job }}-${{ hashFiles('MODULE.bazel', 'go.mod', 'go.sum', '.bazelversion', '.bazelrc') }}- + bazel-${{ runner.os }}-${{ github.job }}- bazel-${{ runner.os }}- diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1ac9c6d9..ab028385 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -59,8 +59,6 @@ jobs: - name: Run E2E tests run: make e2e-test - - uses: ./.github/actions/logs - gateway-integration-test: name: Gateway Integration Test runs-on: ubuntu-latest @@ -71,8 +69,6 @@ jobs: - name: Run Gateway integration tests run: make integration-test-gateway - - uses: ./.github/actions/logs - orchestrator-integration-test: name: Orchestrator Integration Test runs-on: ubuntu-latest @@ -83,8 +79,6 @@ jobs: - name: Run Orchestrator integration tests run: make integration-test-orchestrator - - uses: ./.github/actions/logs - # --------------------------------------------------------------------------- # EXTENSION TESTS # --------------------------------------------------------------------------- diff --git a/CLAUDE.md b/CLAUDE.md index b9528d5c..68626bf2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -36,13 +36,31 @@ func (r Request) WithStatus(status Status) Request { ## Architecture -### Services +### Project Layout + +``` +submitqueue/ +├── gateway/ # Gateway service (port 8081) - entry point +├── orchestrator/ # Orchestrator service (port 8082) - coordinates jobs +├── entity/ # Domain entities (Request, Change, enums) +│ └── queue/ # Queue-specific entities (Message) +├── extension/ # Pluggable backend implementations +│ ├── counter/ # Sequential number generation (interface + mysql/) +│ ├── queue/ # Messaging queue abstraction (interface + sql/) +│ └── storage/ # Storage abstraction (interface + mysql/) +├── core/ +│ └── consumer/ # Reusable queue consumer infrastructure +├── example/server/ # Runnable servers with Docker Compose +├── test/ +│ ├── e2e/ # End-to-end tests (full stack) +│ ├── integration/ # Integration tests (per-service + extensions) +│ └── testutil/ # Test utilities (ComposeStack, MySQL helpers) +└── doc/ # Documentation +``` -Three services, each following the same layout: +### Services -- **Gateway** (port 8081): Entry point for external requests -- **Orchestrator** (port 8082): Coordinates job execution -- **Speculator** (port 8083): Performs speculative builds +Each service follows the same layout: ``` / @@ -50,7 +68,7 @@ Three services, each following the same layout: │ ├── {method}.go # RPC controllers (e.g., land.go, ping.go) │ ├── {method}_test.go │ └── {step}/ # Queue message controllers (e.g., request/) -│ ├── {step}.go # Step in workflow +│ ├── {step}.go │ └── {step}_test.go ├── proto/ # Proto definitions (.proto files) └── protopb/ # Generated proto code (committed to repo) @@ -58,79 +76,43 @@ Three services, each following the same layout: ### Controllers -Controllers contain pure business logic, independent of infrastructure. There are two types: - -**RPC Controllers** - Handle synchronous API requests in `{service}/controller/`. Accept protobuf types, independent of gRPC/YARPC transport. +Two types, both containing pure business logic independent of infrastructure: +**RPC Controllers** — in `{service}/controller/`, accept protobuf types: ```go func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error) ``` -**Queue Message Controllers** - Process async queue messages in `{service}/controller/{step}/`. Implement `consumer.Controller` interface. - +**Queue Message Controllers** — in `{service}/controller/{step}/`, implement `consumer.Controller`: ```go -// Receives consumer.Delivery (NOT extension/queue.Delivery) -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - // Return nil to ack, error to nack. Consumer handles ack/nack automatically. -} +// Return nil to ack, error to nack. Consumer handles ack/nack automatically. +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error ``` -Controllers receive `consumer.Delivery` (subset interface without Ack/Nack methods) to enforce separation: controllers do business logic, consumer framework handles infrastructure. +Controllers receive `consumer.Delivery` (subset interface without Ack/Nack) to enforce separation of business logic from infrastructure. ### Entities -Domain objects in `entity/`, organized by domain. Top-level entities live directly in `entity/`; domain-specific ones go in subdirectories. - -``` -entity/ -├── request.go # Request, Change, enums (RequestState, RequestLandStrategy) -└── queue/ - └── message.go # Message entity -``` - -**Entity guidelines:** -1. Keep entities pure and framework-agnostic — no external dependencies -2. Use value types, not references -3. Prefer `int64` milliseconds over `time.Time` and `time.Duration`: - - Timestamps: Unix epoch milliseconds (e.g., `CreatedAt int64`) — use `time.UnixMilli()` method - - Durations/timeouts: milliseconds (e.g., `TimeoutMs int64`, `DelayMs int64`) - - Use `time.Duration(ms) * time.Millisecond` to convert to `time.Duration` when needed -4. Every field must have a comment explaining its meaning +Domain objects in `entity/`, organized by domain. Guidelines: +1. Pure and framework-agnostic — no external dependencies +2. Value types, not references +3. `int64` milliseconds for timestamps (`CreatedAt int64`) and durations (`TimeoutMs int64`) +4. Every field must have a comment 5. Reference other entities by ID (string or int), not directly -6. Use string enums with clear names; assign sentinel values (`""` for strings, `0` for ints) to unreachable/unknown enum variants +6. String enums with sentinel values (`""` for unknown) ### Extensions -Extensions are **vendor-agnostic, pluggable interfaces** for backend implementations. Each defines interfaces at the top level with implementations in subdirectories. - -``` -extension/ -├── counter/ # Atomic sequential number generation -│ ├── counter.go # Counter interface -│ └── mysql/ # MySQL implementation -├── queue/ # Messaging queue abstraction -│ ├── queue.go # Queue (factory) interface -│ ├── publisher.go # Publisher interface -│ ├── subscriber.go # Subscriber interface -│ ├── delivery.go # Delivery interface -│ └── sql/ # SQL (MySQL) implementation -└── storage/ # Storage abstraction - ├── storage.go # Storage (factory) interface + sentinel errors - ├── request_store.go # RequestStore interface - └── mysql/ # MySQL implementation -``` - -**Extension pattern:** -1. Define vendor-agnostic interfaces at `extension/{ext}/` -2. Implementations go in `extension/{ext}/{impl}/` -3. Most extensions use a Factory interface for dependency injection and lifecycle management -4. Include a README.md documenting interfaces and usage +Vendor-agnostic, pluggable interfaces with implementations in subdirectories: +1. Define interfaces at `extension/{ext}/` +2. Implementations at `extension/{ext}/{impl}/` +3. Factory interface for dependency injection and lifecycle management ### Import Paths - RPC Controllers: `github.com/uber/submitqueue/{service}/controller` - Queue Controllers: `github.com/uber/submitqueue/{service}/controller/{step}` -- Consumer: `github.com/uber/submitqueue/consumer` +- Consumer: `github.com/uber/submitqueue/core/consumer` - Proto (generated): `github.com/uber/submitqueue/{service}/protopb` - Extensions: `github.com/uber/submitqueue/extension/{extension}` - Extension impl: `github.com/uber/submitqueue/extension/{extension}/{impl}` @@ -138,150 +120,85 @@ extension/ ## Development -### Directory Structure - -See [doc/PROJECT_STRUCTURE.md](doc/PROJECT_STRUCTURE.md) for detailed project organization and architecture. - ### Build System -This repository uses **Bazel with Bzlmod** (NOT WORKSPACE) for dependency management. +Bazel with Bzlmod (NOT WORKSPACE). -- **Version pinning**: `.bazelversion` pins the Bazel version -- **Dependencies**: Managed in `MODULE.bazel` (NOT a WORKSPACE file) -- **Go version**: Defined in `go.mod`, read by `MODULE.bazel` via `go_sdk.from_file()` -- **Bazel wrapper**: `./tool/bazel` (Bazelisk wrapper). With direnv (`.envrc`), use `bazel` directly. -- **External dependencies**: Must be added to both `go.mod` AND `MODULE.bazel` -- **BUILD files**: Every Go package must have a `BUILD.bazel` file -- **Gazelle**: Run `make gazelle` after adding/removing Go files to update BUILD files - - CI enforces BUILD files are in sync - will fail if `make gazelle` generates changes - - Always run `make gazelle` before committing +- **Dependencies**: `MODULE.bazel` + `go.mod` (both must be updated) +- **Bazel wrapper**: `./tool/bazel` (Bazelisk). With direnv (`.envrc`), use `bazel` directly. +- **BUILD files**: Every Go package needs `BUILD.bazel`. Run `make gazelle` after adding/removing Go files. +- **CI enforces** BUILD files are in sync — always run `make gazelle` before committing. ### Proto Generation -All generated proto files are **committed to the repository**. When modifying `.proto` files: - -1. Edit the `.proto` file in `{service}/proto/` -2. Run `make proto` to regenerate all three file types: `*.pb.go`, `*_grpc.pb.go`, `*.pb.yarpc.go` -3. Update controller implementations if needed -4. Commit all generated files - -### File Naming - -- Proto files: `{service}.proto` -- Controllers: `{method}.go` or `{feature}.go` -- Entities: `{entity}.go` -- Tests: `{file}_test.go` -- BUILD files: Always `BUILD.bazel` - -### Directory Naming +Generated proto files are committed. When modifying `.proto` files: +1. Edit in `{service}/proto/` +2. `make proto` (generates `*.pb.go`, `*_grpc.pb.go`, `*.pb.yarpc.go`) +3. Commit all generated files -- Use **singular** names for directories (e.g., `mock/` not `mocks/`, `entity/` not `entities/`) -- This applies to all folders including test mocks, extensions, entities, and service directories +### Naming Conventions -### Makefile Convention +- **Directories**: singular (`mock/`, `entity/`, not `mocks/`, `entities/`) +- **Files**: `{method}.go`, `{entity}.go`, `{file}_test.go`, `BUILD.bazel` +- **Proto files**: `{service}.proto` -The `Makefile` follows strict conventions for maintainability: +### Makefile -**Alphabetical ordering:** -- **Targets are alphabetically sorted** — makes it easy to find specific targets -- **`.PHONY` declaration** — lists all targets in alphabetical order -- **`help` target is always last** — exception to alphabetical ordering for discoverability -- When adding new targets, insert them in alphabetical order (not at the end) - -**Help text documentation:** -- **Add `## Description` after each target** — enables auto-generated help and shell completion -- Format: `target: ## Short description of what this target does` -- Example: `build: ## Build all services and examples` -- Run `make help` to see all documented targets with descriptions -- Shell completion (zsh) shows these descriptions when you press `` - -**Example target with help text:** +Targets are **alphabetically sorted**. Each target has `## Description` for auto-generated help and shell completion: ```makefile integration-test: build-all-linux ## Run all integration tests (auto-builds binaries) - @echo "Running all integration tests..." - @$(BAZEL) test //test/integration/... --test_output=errors + @$(BAZEL) test //test/integration/... --test_output=streamed ``` -This convention makes the Makefile self-documenting and enables powerful shell completion. - ### Common Make Targets ```bash -make build # Build all services -make proto # Regenerate proto files -make test # Run unit tests -make integration-test # Run all integration tests (Docker-based) -make integration-test-gateway # Test Gateway service -make e2e-test # Run end-to-end tests -make local-start # Start full stack with Docker Compose -make local-ps # Show running containers and ports -make local-logs # View logs from all services -make local-stop # Stop all services -make run-client-gateway # Test Gateway client (SERVER_ADDR, MESSAGE) -make run-client-orchestrator # Test Orchestrator client -make gazelle # Update BUILD.bazel files -make clean # Clean Bazel cache -make clean-proto # Remove generated proto files +make build # Build all services +make test # Run unit tests +make integration-test # Run all integration tests (Docker-based) +make e2e-test # Run end-to-end tests +make proto # Regenerate proto files +make gazelle # Update BUILD.bazel files +make local-start # Start full stack with Docker Compose +make local-ps # Show running containers and ports +make local-logs # View logs from all services +make local-stop # Stop all services +make clean # Clean Bazel cache ``` ### Common Workflows **Add new RPC method:** -1. Edit `{service}/proto/*.proto` -2. `make proto` -3. Add controller in `{service}/controller/` -4. Wire up in `example/server/{service}/main.go` +1. Edit `{service}/proto/*.proto` → `make proto` +2. Add controller in `{service}/controller/` +3. Wire up in `example/server/{service}/main.go` **Add new queue message controller:** -1. Create `{service}/controller/{step}/` with controller implementing `consumer.Controller` -2. Wire up in `example/server/{service}/main.go`: register → start → stop on shutdown +1. Create `{service}/controller/{step}/` implementing `consumer.Controller` +2. Wire up in `example/server/{service}/main.go` -**Add new extension implementation:** -1. Create `extension/{extension}/{impl}/` directory -2. Implement factory and core interfaces -3. Add `BUILD.bazel` -4. Add tests and document in README.md +**Add new extension:** +1. Create `extension/{ext}/{impl}/` with factory and interfaces +2. Add `BUILD.bazel`, tests, and README.md **Add new entity:** -1. Create `entity/{domain}/{entity}.go` with test file -2. Add `BUILD.bazel` with `go_library` and `go_test` targets - -### Testing Guidelines - -1. **Avoid asserting on error messages** — assert on error type if it is part of the contract, or assert generic error otherwise. -2. **Avoid blocking operations for synchronization** — do not use `time.Sleep`. Design the tested routine to signal back (channels, callbacks, condition variables). -3. **Use testify assertions** — use `stretchr/assert` or `require` instead of `t.Fatal()`. - -**Integration Test Conventions:** - -1. **Package naming** — use folder name as package name (NOT `*_test` suffix): - - `test/integration/gateway/` → `package gateway` - - `test/integration/extension/counter/mysql/` → `package mysql` - - This matches Uber's go-code integration test pattern +1. Create `entity/{domain}/{entity}.go` with test file and `BUILD.bazel` -2. **Bazel target naming** — use Gazelle-generated names and add `tags = ["integration"]`: - - Target name matches folder: `name = "gateway_test"`, `name = "mysql_test"` - - Always include `tags = ["integration"]` to exclude from unit tests - - Include `data = [...]` for docker-compose and schema files +### Testing -3. **Docker Compose-based** — all integration tests use Docker Compose: - - Use `testutil.NewComposeStack()` for hermetic setup - - Provide meaningful test context (e.g., "ext-storage-mysql", "svc-gateway") - - Use `stack.ConnectMySQLService()` or `stack.MySQLServiceDSN()` for DB connections +- **Avoid asserting on error messages** — assert on error type or generic error. +- **No `time.Sleep` for synchronization** — use channels, callbacks, condition variables. +- **Use testify** — `assert`/`require` instead of `t.Fatal()`. -### Code Style Guidelines +**Integration tests** use Docker Compose via `testutil.ComposeStack`: +- Package naming: folder name as package (NOT `*_test` suffix) +- Bazel: add `tags = ["integration"]` and `data = [...]` for compose/schema files +- Use `testutil.NewComposeStack()` with meaningful context (e.g., `"ext-storage-mysql"`) -1. **Use SugaredLogger for structured logging** — always use `zap.SugaredLogger` with structured logging methods: - - `logger.Debugw(msg, key1, val1, key2, val2, ...)` for debug logs - - `logger.Infow(msg, key1, val1, key2, val2, ...)` for info logs - - `logger.Errorw(msg, key1, val1, key2, val2, ...)` for error logs - - Never use unstructured methods like `Debug()`, `Info()`, `Error()`, or `Printf()` - - Example: `logger.Infow("starting consumer", "subscriber_name", subscriberName, "controller_count", len(controllers))` +See [doc/howto/TESTING.md](doc/howto/TESTING.md) for full testing guide. -2. **Use interfaces for contracts** — define interfaces for public APIs and dependencies: - - Public components should return/accept interfaces, not concrete structs - - Unexported structs implement the interfaces - - Makes testing easier through mocking - - Example: `func New(...) Consumer` returns interface, not `*consumer` - - Implementation struct is unexported: `type consumer struct { ... }` +### Code Style +1. **Structured logging** — `zap.SugaredLogger` with `Debugw`/`Infow`/`Errorw(msg, key, val, ...)`. Never unstructured methods. +2. **Interfaces for behavior, structs for data** — use interfaces for behavioral contracts (Consumer, Controller, Storage). Use structs for data containers, configs, and registries (TopicRegistry, SubscriptionConfig). +3. **Value types over pointers** — prefer value types for structs, configs, and return values. Use `(T, bool)` to signal absence instead of `*T`. Pointers only when mutation or shared ownership is needed. diff --git a/Makefile b/Makefile index 9b25664c..4016bf24 100644 --- a/Makefile +++ b/Makefile @@ -62,7 +62,7 @@ deps: ## Install Go dependencies e2e-test: build-all-linux ## Run end-to-end tests (hermetic, auto-builds binaries) @echo "Running end-to-end tests..." - @$(BAZEL) test //test/e2e:e2e_test --test_output=all + @$(BAZEL) test //test/e2e:e2e_test --test_output=streamed gazelle: ## Update BUILD.bazel files @echo "Running Gazelle to update BUILD files..." @@ -70,19 +70,19 @@ gazelle: ## Update BUILD.bazel files integration-test: build-all-linux ## Run all integration tests (auto-builds binaries) @echo "Running all integration tests..." - @$(BAZEL) test //test/integration/... --test_output=errors + @$(BAZEL) test //test/integration/... --test_output=streamed integration-test-extensions: ## Run extension integration tests @echo "Running extension integration tests..." - @$(BAZEL) test //test/integration/extension/... --test_output=errors + @$(BAZEL) test //test/integration/extension/... --test_output=streamed integration-test-gateway: build-gateway-linux ## Run Gateway integration tests (auto-builds binary) @echo "Running Gateway integration tests..." - @$(BAZEL) test //test/integration/gateway:gateway_test --test_output=errors + @$(BAZEL) test //test/integration/gateway:gateway_test --test_output=streamed integration-test-orchestrator: build-orchestrator-linux ## Run Orchestrator integration tests (auto-builds binary) @echo "Running Orchestrator integration tests..." - @$(BAZEL) test //test/integration/orchestrator:orchestrator_test --test_output=errors + @$(BAZEL) test //test/integration/orchestrator:orchestrator_test --test_output=streamed local-clean: ## Stop and remove all local services, volumes, and images @echo "Cleaning all services and data..." diff --git a/core/consumer/BUILD.bazel b/core/consumer/BUILD.bazel index 57403ba5..d3618dcb 100644 --- a/core/consumer/BUILD.bazel +++ b/core/consumer/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "consumer.go", "controller.go", "error.go", + "registry.go", ], importpath = "github.com/uber/submitqueue/core/consumer", visibility = ["//visibility:public"], @@ -22,14 +23,18 @@ go_test( srcs = [ "consumer_test.go", "error_test.go", + "registry_test.go", ], embed = [":consumer"], deps = [ + "//core/consumer/mock", "//entity/queue", "//extension/queue", + "//extension/queue/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_mock//gomock", "@org_uber_go_zap//zaptest", ], ) diff --git a/core/consumer/consumer.go b/core/consumer/consumer.go index 8c75204e..c8e3da6e 100644 --- a/core/consumer/consumer.go +++ b/core/consumer/consumer.go @@ -28,15 +28,14 @@ type Consumer interface { // consumer implements the Consumer interface. type consumer struct { - logger *zap.SugaredLogger - metricsScope tally.Scope - queue queue.Queue - subscriberName string // Unique worker ID (hostname, pod name) + logger *zap.SugaredLogger + metricsScope tally.Scope + registry TopicRegistry mu sync.Mutex stopped bool controllers []Controller - subscriptions map[string]*activeSubscription // topic -> subscription + subscriptions map[Topic]*activeSubscription // topic -> subscription } // activeSubscription tracks the state of an active subscription. @@ -47,14 +46,13 @@ type activeSubscription struct { } // New creates a new consumer. -// subscriberName is the unique worker identifier used for partition leasing (e.g., hostname, pod name). -func New(logger *zap.SugaredLogger, scope tally.Scope, q queue.Queue, subscriberName string) Consumer { +// registry provides queue and subscription config for topics. +func New(logger *zap.SugaredLogger, scope tally.Scope, registry TopicRegistry) Consumer { return &consumer{ - logger: logger, - metricsScope: scope.SubScope("consumer"), - queue: q, - subscriberName: subscriberName, - subscriptions: make(map[string]*activeSubscription), + logger: logger, + metricsScope: scope.SubScope("consumer"), + registry: registry, + subscriptions: make(map[Topic]*activeSubscription), } } @@ -105,7 +103,6 @@ func (m *consumer) Start(ctx context.Context) error { } m.logger.Infow("starting consumer", - "subscriber_name", m.subscriberName, "controller_count", len(m.controllers), ) @@ -127,12 +124,23 @@ func (m *consumer) Start(ctx context.Context) error { // subscribe subscribes a controller to its topic and spawns a consumption goroutine. func (m *consumer) subscribe(ctx context.Context, controller Controller) error { - // Get controller's subscription config - config := controller.SubscriptionConfig(m.subscriberName) + topic := controller.Topic() + consumerGroup := controller.ConsumerGroup() - // Subscribe to topic - subscriber := m.queue.Subscriber() - deliveryChan, err := subscriber.Subscribe(ctx, controller.Topic(), config) + // Get subscription config from registry + config, ok := m.registry.SubscriptionConfig(topic, consumerGroup) + if !ok { + return fmt.Errorf("no subscription config for topic %s, consumer group %s", topic, consumerGroup) + } + + // Get queue for this topic + q, ok := m.registry.Queue(topic) + if !ok { + return fmt.Errorf("no queue registered for topic %s", topic) + } + + subscriber := q.Subscriber() + deliveryChan, err := subscriber.Subscribe(ctx, topic.String(), config) if err != nil { return fmt.Errorf("subscribe failed: %w", err) } @@ -147,15 +155,15 @@ func (m *consumer) subscribe(ctx context.Context, controller Controller) error { cancelFunc: cancel, done: done, } - m.subscriptions[controller.Topic()] = sub + m.subscriptions[topic] = sub // Spawn consumption goroutine go m.consumeLoop(controllerCtx, controller, deliveryChan, done) m.logger.Infow("controller started", "controller", controller.Name(), - "topic", controller.Topic(), - "consumer_group", controller.ConsumerGroup(), + "topic", topic, + "consumer_group", consumerGroup, ) return nil @@ -165,14 +173,16 @@ func (m *consumer) subscribe(ctx context.Context, controller Controller) error { func (m *consumer) consumeLoop(ctx context.Context, controller Controller, deliveryChan <-chan queue.Delivery, done chan struct{}) { defer close(done) + topic := controller.Topic() + controllerScope := m.metricsScope.Tagged(map[string]string{ "controller": controller.Name(), - "topic": controller.Topic(), + "topic": topic.String(), }) m.logger.Debugw("consume loop started", "controller", controller.Name(), - "topic", controller.Topic(), + "topic", topic, ) for { @@ -180,7 +190,7 @@ func (m *consumer) consumeLoop(ctx context.Context, controller Controller, deliv case <-ctx.Done(): m.logger.Infow("consume loop stopped", "controller", controller.Name(), - "topic", controller.Topic(), + "topic", topic, ) return @@ -188,7 +198,7 @@ func (m *consumer) consumeLoop(ctx context.Context, controller Controller, deliv if !ok { m.logger.Infow("delivery channel closed", "controller", controller.Name(), - "topic", controller.Topic(), + "topic", topic, ) return } @@ -204,10 +214,11 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d controllerScope.Counter("messages_received").Inc(1) msg := delivery.Message() + topic := controller.Topic() m.logger.Debugw("processing delivery", "controller", controller.Name(), - "topic", controller.Topic(), + "topic", topic, "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), @@ -263,7 +274,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d // Controller returned retryable error - nack message for retry m.logger.Errorw("controller error, nacking message", "controller", controller.Name(), - "topic", controller.Topic(), + "topic", topic, "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), @@ -278,7 +289,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d if nackErr := delivery.Nack(ctx, 0); nackErr != nil { m.logger.Errorw("failed to nack message", "controller", controller.Name(), - "topic", controller.Topic(), + "topic", topic, "message_id", msg.ID, "error", nackErr, ) @@ -299,7 +310,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d if ackErr := delivery.Ack(ctx); ackErr != nil { m.logger.Errorw("failed to ack message", "controller", controller.Name(), - "topic", controller.Topic(), + "topic", topic, "message_id", msg.ID, "error", ackErr, ) @@ -323,7 +334,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d m.logger.Debugw("message processed successfully", "controller", controller.Name(), - "topic", controller.Topic(), + "topic", topic, "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), @@ -388,7 +399,7 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error { } // Clear subscriptions - m.subscriptions = make(map[string]*activeSubscription) + m.subscriptions = make(map[Topic]*activeSubscription) if timedOut { return fmt.Errorf("timeout waiting for controllers to stop") diff --git a/core/consumer/consumer_test.go b/core/consumer/consumer_test.go index 7cc766ba..3af92387 100644 --- a/core/consumer/consumer_test.go +++ b/core/consumer/consumer_test.go @@ -1,238 +1,105 @@ -package consumer +package consumer_test import ( "context" "fmt" "strings" - "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + consumermock "github.com/uber/submitqueue/core/consumer/mock" "github.com/uber/submitqueue/entity/queue" extqueue "github.com/uber/submitqueue/extension/queue" + queuemock "github.com/uber/submitqueue/extension/queue/mock" + "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) -// Mock Controller -type mockController struct { - name string - topic string - consumerGroup string - processFunc func(ctx context.Context, delivery Delivery) error -} - -func (m *mockController) Process(ctx context.Context, delivery Delivery) error { - return m.processFunc(ctx, delivery) -} - -func (m *mockController) Name() string { - return m.name -} - -func (m *mockController) Topic() string { - return m.topic -} - -func (m *mockController) ConsumerGroup() string { - return m.consumerGroup -} - -func (m *mockController) SubscriptionConfig(subscriberName string) extqueue.SubscriptionConfig { - return extqueue.DefaultSubscriptionConfig(subscriberName, m.consumerGroup) -} - -// Mock Delivery -type mockDelivery struct { - msg queue.Message - attempt int - ackFunc func(ctx context.Context) error - nackFunc func(ctx context.Context, requeueAfterMillis int64) error - rejectFunc func(ctx context.Context, reason string) error - acked bool - nacked bool - rejected bool - rejectReason string - nackDelayMs int64 - done chan struct{} // Signals when ack/nack/reject is called - mu sync.Mutex -} - -func (m *mockDelivery) Message() queue.Message { - return m.msg -} - -func (m *mockDelivery) DeliveryID() string { - return m.msg.ID -} - -func (m *mockDelivery) Attempt() int { - return m.attempt -} - -func (m *mockDelivery) ReceivedAt() int64 { - return time.Now().UnixMilli() -} - -func (m *mockDelivery) Metadata() map[string]string { - return nil -} - -func (m *mockDelivery) Ack(ctx context.Context) error { - m.mu.Lock() - defer m.mu.Unlock() - m.acked = true - if m.done != nil { - close(m.done) - } - if m.ackFunc != nil { - return m.ackFunc(ctx) - } - return nil -} - -func (m *mockDelivery) Nack(ctx context.Context, requeueAfterMillis int64) error { - m.mu.Lock() - defer m.mu.Unlock() - m.nacked = true - m.nackDelayMs = requeueAfterMillis - if m.done != nil { - close(m.done) - } - if m.nackFunc != nil { - return m.nackFunc(ctx, requeueAfterMillis) - } - return nil -} - -func (m *mockDelivery) Reject(ctx context.Context, reason string) error { - m.mu.Lock() - defer m.mu.Unlock() - m.rejected = true - m.rejectReason = reason - if m.done != nil { - close(m.done) - } - if m.rejectFunc != nil { - return m.rejectFunc(ctx, reason) +// setupController configures a MockController with standard expectations. +func setupController(mc *consumermock.MockController, name string, topic consumer.Topic, consumerGroup string, processFunc func(context.Context, consumer.Delivery) error) { + mc.EXPECT().Name().Return(name).AnyTimes() + mc.EXPECT().Topic().Return(topic).AnyTimes() + mc.EXPECT().ConsumerGroup().Return(consumerGroup).AnyTimes() + if processFunc != nil { + mc.EXPECT().Process(gomock.Any(), gomock.Any()).DoAndReturn(processFunc).AnyTimes() } - return nil -} - -func (m *mockDelivery) ExtendVisibilityTimeout(ctx context.Context, durationMillis int64) error { - return nil -} - -func (m *mockDelivery) WasAcked() bool { - m.mu.Lock() - defer m.mu.Unlock() - return m.acked -} - -func (m *mockDelivery) WasNacked() bool { - m.mu.Lock() - defer m.mu.Unlock() - return m.nacked -} - -func (m *mockDelivery) WasRejected() bool { - m.mu.Lock() - defer m.mu.Unlock() - return m.rejected -} - -// Mock Subscriber -type mockSubscriber struct { - subscribeFunc func(ctx context.Context, topic string, config extqueue.SubscriptionConfig) (<-chan extqueue.Delivery, error) } -func (m *mockSubscriber) Subscribe(ctx context.Context, topic string, config extqueue.SubscriptionConfig) (<-chan extqueue.Delivery, error) { - return m.subscribeFunc(ctx, topic, config) -} - -func (m *mockSubscriber) Close() error { - return nil -} - -// Mock Queue -type mockQueue struct { - subscriber extqueue.Subscriber -} - -func (m *mockQueue) Publisher() extqueue.Publisher { - return nil -} - -func (m *mockQueue) Subscriber() extqueue.Subscriber { - return m.subscriber -} - -func (m *mockQueue) Close() error { - return nil +// newRegistry creates a TopicRegistry with a mock queue and default subscription config. +func newRegistry(q extqueue.Queue, topic consumer.Topic, consumerGroup string) consumer.TopicRegistry { + return consumer.NewTopicRegistry( + []consumer.TopicConfig{{Topic: topic, Queue: q}}, + []extqueue.SubscriptionConfig{ + extqueue.DefaultSubscriptionConfig(topic.String(), "test-worker", consumerGroup), + }, + ) +} + +// setupDelivery creates a MockDelivery with standard expectations and a done channel +// that closes when Ack or Nack is called. +func setupDelivery(del *queuemock.MockDelivery, msg queue.Message, ackErr, nackErr error) chan struct{} { + done := make(chan struct{}) + del.EXPECT().Message().Return(msg).AnyTimes() + del.EXPECT().Attempt().Return(1).AnyTimes() + del.EXPECT().ReceivedAt().Return(time.Now().UnixMilli()).AnyTimes() + del.EXPECT().Metadata().Return(nil).AnyTimes() + del.EXPECT().DeliveryID().Return(msg.ID).AnyTimes() + del.EXPECT().Ack(gomock.Any()).DoAndReturn(func(ctx context.Context) error { + close(done) + return ackErr + }).MaxTimes(1) + del.EXPECT().Nack(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, requeueAfterMillis int64) error { + close(done) + return nackErr + }).MaxTimes(1) + return done } func TestNew(t *testing.T) { logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope - q := &mockQueue{} - c := New(logger, scope, q, "test-worker") + reg := consumer.NewTopicRegistry(nil, nil) + c := consumer.New(logger, tally.NoopScope, reg) require.NotNil(t, c) - - // Type assert to access internal fields - impl := c.(*consumer) - assert.Equal(t, "test-worker", impl.subscriberName) - assert.Empty(t, impl.controllers) - assert.Empty(t, impl.subscriptions) } func TestConsumer_Register(t *testing.T) { + ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope - q := &mockQueue{} - c := New(logger, scope, q, "test-worker") - - handler1 := &mockController{ - name: "handler1", - topic: "topic1", - consumerGroup: "group1", - } - handler2 := &mockController{ - name: "handler2", - topic: "topic2", - consumerGroup: "group2", - } - // Register first handler + reg := consumer.NewTopicRegistry(nil, nil) + c := consumer.New(logger, tally.NoopScope, reg) + + handler1 := consumermock.NewMockController(ctrl) + setupController(handler1, "handler1", consumer.TopicRequest, "group1", nil) + + handler2 := consumermock.NewMockController(ctrl) + setupController(handler2, "handler2", consumer.Topic("other-topic"), "group2", nil) + err := c.Register(handler1) require.NoError(t, err) - assert.Len(t, c.(*consumer).controllers, 1) - // Register second handler err = c.Register(handler2) require.NoError(t, err) - assert.Len(t, c.(*consumer).controllers, 2) } func TestConsumer_Register_DuplicateTopic(t *testing.T) { + ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope - q := &mockQueue{} - c := New(logger, scope, q, "test-worker") - - handler1 := &mockController{ - name: "handler1", - topic: "topic1", - consumerGroup: "group1", - } - handler2 := &mockController{ - name: "handler2", - topic: "topic1", // Same topic - consumerGroup: "group2", - } + + reg := consumer.NewTopicRegistry(nil, nil) + c := consumer.New(logger, tally.NoopScope, reg) + + handler1 := consumermock.NewMockController(ctrl) + setupController(handler1, "handler1", consumer.TopicRequest, "group1", nil) + + handler2 := consumermock.NewMockController(ctrl) + setupController(handler2, "handler2", consumer.TopicRequest, "group2", nil) err := c.Register(handler1) require.NoError(t, err) @@ -242,19 +109,17 @@ func TestConsumer_Register_DuplicateTopic(t *testing.T) { } func TestConsumer_Register_AfterStop(t *testing.T) { + ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope - q := &mockQueue{} - c := New(logger, scope, q, "test-worker") + + reg := consumer.NewTopicRegistry(nil, nil) + c := consumer.New(logger, tally.NoopScope, reg) err := c.Stop(1000) require.NoError(t, err) - handler := &mockController{ - name: "handler1", - topic: "topic1", - consumerGroup: "group1", - } + handler := consumermock.NewMockController(ctrl) + setupController(handler, "handler1", consumer.TopicRequest, "group1", nil) err = c.Register(handler) assert.Error(t, err) @@ -262,26 +127,23 @@ func TestConsumer_Register_AfterStop(t *testing.T) { func TestConsumer_Start_NoHandlers(t *testing.T) { logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope - q := &mockQueue{} - c := New(logger, scope, q, "test-worker") - ctx := context.Background() - err := c.Start(ctx) + reg := consumer.NewTopicRegistry(nil, nil) + c := consumer.New(logger, tally.NoopScope, reg) + + err := c.Start(context.Background()) assert.Error(t, err) } func TestConsumer_Start_AfterStop(t *testing.T) { + ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope - q := &mockQueue{} - c := New(logger, scope, q, "test-worker") - - handler := &mockController{ - name: "handler1", - topic: "topic1", - consumerGroup: "group1", - } + + reg := consumer.NewTopicRegistry(nil, nil) + c := consumer.New(logger, tally.NoopScope, reg) + + handler := consumermock.NewMockController(ctrl) + setupController(handler, "handler1", consumer.TopicRequest, "group1", nil) err := c.Register(handler) require.NoError(t, err) @@ -289,34 +151,83 @@ func TestConsumer_Start_AfterStop(t *testing.T) { err = c.Stop(1000) require.NoError(t, err) - ctx := context.Background() - err = c.Start(ctx) + err = c.Start(context.Background()) + assert.Error(t, err) +} + +func TestConsumer_Start_MissingSubscriptionConfig(t *testing.T) { + ctrl := gomock.NewController(t) + logger := zaptest.NewLogger(t).Sugar() + + mockQ := queuemock.NewMockQueue(ctrl) + // Registry has queue but no subscription config + reg := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Topic: consumer.TopicRequest, Queue: mockQ}}, + nil, + ) + + c := consumer.New(logger, tally.NoopScope, reg) + + handler := consumermock.NewMockController(ctrl) + setupController(handler, "handler", consumer.TopicRequest, "group", nil) + + err := c.Register(handler) + require.NoError(t, err) + + err = c.Start(context.Background()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "no subscription config") +} + +func TestConsumer_Start_SubscribeFailure(t *testing.T) { + ctrl := gomock.NewController(t) + logger := zaptest.NewLogger(t).Sugar() + + mockSub := queuemock.NewMockSubscriber(ctrl) + mockSub.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, fmt.Errorf("connection refused")) + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Subscriber().Return(mockSub) + + reg := newRegistry(mockQ, consumer.TopicRequest, "group") + + c := consumer.New(logger, tally.NoopScope, reg) + + handler := consumermock.NewMockController(ctrl) + setupController(handler, "handler", consumer.TopicRequest, "group", nil) + + err := c.Register(handler) + require.NoError(t, err) + + err = c.Start(context.Background()) assert.Error(t, err) + assert.Contains(t, err.Error(), "subscribe failed") } func TestConsumer_ProcessDelivery_Success(t *testing.T) { + ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope deliveryChan := make(chan extqueue.Delivery, 1) - subscriber := &mockSubscriber{ - subscribeFunc: func(ctx context.Context, topic string, config extqueue.SubscriptionConfig) (<-chan extqueue.Delivery, error) { - return deliveryChan, nil - }, - } - q := &mockQueue{subscriber: subscriber} - c := New(logger, scope, q, "test-worker") + mockSub := queuemock.NewMockSubscriber(ctrl) + mockSub.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(deliveryChan, nil) + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Subscriber().Return(mockSub) + + reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + + c := consumer.New(logger, tally.NoopScope, reg) handledMsg := "" - handler := &mockController{ - name: "test-handler", - topic: "test-topic", - consumerGroup: "test-group", - processFunc: func(ctx context.Context, delivery Delivery) error { + handler := consumermock.NewMockController(ctrl) + setupController(handler, "test-handler", consumer.TopicRequest, "test-group", + func(ctx context.Context, delivery consumer.Delivery) error { handledMsg = delivery.Message().ID - return nil // Success + return nil }, - } + ) err := c.Register(handler) require.NoError(t, err) @@ -327,48 +238,40 @@ func TestConsumer_ProcessDelivery_Success(t *testing.T) { err = c.Start(ctx) require.NoError(t, err) - // Send a test message msg := queue.NewMessage("test-msg-1", []byte("payload"), "partition1", nil) - delivery := &mockDelivery{ - msg: msg, - attempt: 1, - done: make(chan struct{}), - } - - deliveryChan <- delivery + mockDel := queuemock.NewMockDelivery(ctrl) + done := setupDelivery(mockDel, msg, nil, nil) - // Wait for processing to complete - <-delivery.done + deliveryChan <- mockDel + <-done assert.Equal(t, "test-msg-1", handledMsg) - assert.True(t, delivery.WasAcked(), "Message should be acked on success") - assert.False(t, delivery.WasNacked(), "Message should not be nacked on success") err = c.Stop(30000) require.NoError(t, err) } func TestConsumer_ProcessDelivery_Error(t *testing.T) { + ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope deliveryChan := make(chan extqueue.Delivery, 1) - subscriber := &mockSubscriber{ - subscribeFunc: func(ctx context.Context, topic string, config extqueue.SubscriptionConfig) (<-chan extqueue.Delivery, error) { - return deliveryChan, nil - }, - } - q := &mockQueue{subscriber: subscriber} - c := New(logger, scope, q, "test-worker") - - handler := &mockController{ - name: "test-handler", - topic: "test-topic", - consumerGroup: "test-group", - processFunc: func(ctx context.Context, delivery Delivery) error { + mockSub := queuemock.NewMockSubscriber(ctrl) + mockSub.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(deliveryChan, nil) + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Subscriber().Return(mockSub) + + reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + + c := consumer.New(logger, tally.NoopScope, reg) + + handler := consumermock.NewMockController(ctrl) + setupController(handler, "test-handler", consumer.TopicRequest, "test-group", + func(ctx context.Context, delivery consumer.Delivery) error { return fmt.Errorf("processing failed") }, - } + ) err := c.Register(handler) require.NoError(t, err) @@ -379,47 +282,38 @@ func TestConsumer_ProcessDelivery_Error(t *testing.T) { err = c.Start(ctx) require.NoError(t, err) - // Send a test message msg := queue.NewMessage("test-msg-2", []byte("payload"), "partition1", nil) - delivery := &mockDelivery{ - msg: msg, - attempt: 2, - done: make(chan struct{}), - } - - deliveryChan <- delivery + mockDel := queuemock.NewMockDelivery(ctrl) + done := setupDelivery(mockDel, msg, nil, nil) - // Wait for processing to complete - <-delivery.done - - assert.False(t, delivery.WasAcked(), "Message should not be acked on error") - assert.True(t, delivery.WasNacked(), "Message should be nacked on error") + deliveryChan <- mockDel + <-done err = c.Stop(30000) require.NoError(t, err) } func TestConsumer_ProcessDelivery_NonRetryableError(t *testing.T) { + ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope deliveryChan := make(chan extqueue.Delivery, 1) - subscriber := &mockSubscriber{ - subscribeFunc: func(ctx context.Context, topic string, config extqueue.SubscriptionConfig) (<-chan extqueue.Delivery, error) { - return deliveryChan, nil - }, - } - q := &mockQueue{subscriber: subscriber} - c := New(logger, scope, q, "test-worker") - - handler := &mockController{ - name: "test-handler", - topic: "test-topic", - consumerGroup: "test-group", - processFunc: func(ctx context.Context, delivery Delivery) error { - return NewNonRetryableError(fmt.Errorf("bad payload")) + mockSub := queuemock.NewMockSubscriber(ctrl) + mockSub.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(deliveryChan, nil) + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Subscriber().Return(mockSub) + + reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + + c := consumer.New(logger, tally.NoopScope, reg) + + handler := consumermock.NewMockController(ctrl) + setupController(handler, "test-handler", consumer.TopicRequest, "test-group", + func(ctx context.Context, delivery consumer.Delivery) error { + return consumer.NewNonRetryableError(fmt.Errorf("bad payload")) }, - } + ) err := c.Register(handler) require.NoError(t, err) @@ -430,48 +324,43 @@ func TestConsumer_ProcessDelivery_NonRetryableError(t *testing.T) { err = c.Start(ctx) require.NoError(t, err) - // Send a test message with non-retryable payload msg := queue.NewMessage("poison-msg", []byte("bad"), "partition1", nil) - delivery := &mockDelivery{ - msg: msg, - attempt: 1, - done: make(chan struct{}), - } - - deliveryChan <- delivery - - // Wait for processing to complete - <-delivery.done - - assert.True(t, delivery.WasRejected(), "Non-retryable message should be rejected") - assert.False(t, delivery.WasAcked(), "Non-retryable message should not be acked directly") - assert.False(t, delivery.WasNacked(), "Non-retryable message should not be nacked") + done := make(chan struct{}) + mockDel := queuemock.NewMockDelivery(ctrl) + mockDel.EXPECT().Message().Return(msg).AnyTimes() + mockDel.EXPECT().Attempt().Return(1).AnyTimes() + mockDel.EXPECT().ReceivedAt().Return(time.Now().UnixMilli()).AnyTimes() + mockDel.EXPECT().Metadata().Return(nil).AnyTimes() + mockDel.EXPECT().DeliveryID().Return(msg.ID).AnyTimes() + mockDel.EXPECT().Reject(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, reason string) error { + close(done) + return nil + }).Times(1) + + deliveryChan <- mockDel + <-done err = c.Stop(30000) require.NoError(t, err) } func TestConsumer_Stop(t *testing.T) { + ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope deliveryChan := make(chan extqueue.Delivery) - subscriber := &mockSubscriber{ - subscribeFunc: func(ctx context.Context, topic string, config extqueue.SubscriptionConfig) (<-chan extqueue.Delivery, error) { - return deliveryChan, nil - }, - } - q := &mockQueue{subscriber: subscriber} - c := New(logger, scope, q, "test-worker") - - handler := &mockController{ - name: "test-handler", - topic: "test-topic", - consumerGroup: "test-group", - processFunc: func(ctx context.Context, delivery Delivery) error { - return nil - }, - } + mockSub := queuemock.NewMockSubscriber(ctrl) + mockSub.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(deliveryChan, nil) + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Subscriber().Return(mockSub) + + reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + + c := consumer.New(logger, tally.NoopScope, reg) + + handler := consumermock.NewMockController(ctrl) + setupController(handler, "test-handler", consumer.TopicRequest, "test-group", nil) err := c.Register(handler) require.NoError(t, err) @@ -482,30 +371,15 @@ func TestConsumer_Stop(t *testing.T) { err = c.Start(ctx) require.NoError(t, err) - assert.Len(t, c.(*consumer).subscriptions, 1) - - // Stop the c + // Stop should complete cleanly err = c.Stop(30000) require.NoError(t, err) - - assert.Empty(t, c.(*consumer).subscriptions, "Subscriptions should be cleared after stop") } func TestConsumer_ObservabilityTags(t *testing.T) { - logger := zaptest.NewLogger(t).Sugar() - - deliveryChan := make(chan extqueue.Delivery, 10) - subscriber := &mockSubscriber{ - subscribeFunc: func(ctx context.Context, topic string, config extqueue.SubscriptionConfig) (<-chan extqueue.Delivery, error) { - return deliveryChan, nil - }, - } - q := &mockQueue{subscriber: subscriber} - tests := []struct { name string handlerError error - ackError error nackError error expectSuccess bool expectAckCount bool @@ -513,7 +387,7 @@ func TestConsumer_ObservabilityTags(t *testing.T) { { name: "success with ack", handlerError: nil, - ackError: nil, + nackError: nil, expectSuccess: true, expectAckCount: true, }, @@ -528,18 +402,27 @@ func TestConsumer_ObservabilityTags(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Create a fresh scope for each test + ctrl := gomock.NewController(t) + logger := zaptest.NewLogger(t).Sugar() testScope := tally.NewTestScope("consumer", nil) - testC := New(logger, testScope, q, "test-worker") - handler := &mockController{ - name: "test-handler", - topic: "test-topic", - consumerGroup: "test-group", - processFunc: func(ctx context.Context, delivery Delivery) error { + deliveryChan := make(chan extqueue.Delivery, 1) + mockSub := queuemock.NewMockSubscriber(ctrl) + mockSub.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(deliveryChan, nil) + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Subscriber().Return(mockSub) + + reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + + testC := consumer.New(logger, testScope, reg) + + handler := consumermock.NewMockController(ctrl) + setupController(handler, "test-handler", consumer.TopicRequest, "test-group", + func(ctx context.Context, delivery consumer.Delivery) error { return tt.handlerError }, - } + ) err := testC.Register(handler) require.NoError(t, err) @@ -550,56 +433,39 @@ func TestConsumer_ObservabilityTags(t *testing.T) { err = testC.Start(ctx) require.NoError(t, err) - // Send a test message msg := queue.NewMessage("msg-1", []byte("payload"), "partition1", nil) - delivery := &mockDelivery{ - msg: msg, - attempt: 1, - done: make(chan struct{}), - ackFunc: func(ctx context.Context) error { - return tt.ackError - }, - nackFunc: func(ctx context.Context, requeueAfterMillis int64) error { - return tt.nackError - }, - } + mockDel := queuemock.NewMockDelivery(ctrl) + done := setupDelivery(mockDel, msg, nil, tt.nackError) - deliveryChan <- delivery + deliveryChan <- mockDel + <-done - // Wait for processing to complete - <-delivery.done - - // Verify metrics exist snapshot := testScope.Snapshot() - // Check handler latency with success tag exists timers := snapshot.Timers() assert.NotEmpty(t, timers, "Should have timer metrics") - // Check for handler latency metric var foundLatency bool for _, timer := range timers { if strings.Contains(timer.Name(), "controller_latency") { foundLatency = true - // Verify success tag tags := timer.Tags() if tt.expectSuccess { - assert.Equal(t, "true", tags["success"], "Should have success=true tag") + assert.Equal(t, "true", tags["success"]) } else { - assert.Equal(t, "false", tags["success"], "Should have success=false tag") + assert.Equal(t, "false", tags["success"]) } } } assert.True(t, foundLatency, "Should have controller_latency metric") - // Check counters counters := snapshot.Counters() if tt.expectAckCount { var foundAck bool for _, counter := range counters { if strings.Contains(counter.Name(), "ack_count") { foundAck = true - assert.Greater(t, counter.Value(), int64(0), "ack_count should be > 0") + assert.Greater(t, counter.Value(), int64(0)) } } assert.True(t, foundAck, "Should have ack_count metric") @@ -611,26 +477,25 @@ func TestConsumer_ObservabilityTags(t *testing.T) { } func TestConsumer_AckNackLatencyTracking(t *testing.T) { + ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() scope := tally.NewTestScope("consumer", nil) deliveryChan := make(chan extqueue.Delivery, 1) - subscriber := &mockSubscriber{ - subscribeFunc: func(ctx context.Context, topic string, config extqueue.SubscriptionConfig) (<-chan extqueue.Delivery, error) { - return deliveryChan, nil - }, - } - q := &mockQueue{subscriber: subscriber} - c := New(logger, scope, q, "test-worker") - - handler := &mockController{ - name: "test-handler", - topic: "test-topic", - consumerGroup: "test-group", - processFunc: func(ctx context.Context, delivery Delivery) error { - return nil // Success - }, - } + mockSub := queuemock.NewMockSubscriber(ctrl) + mockSub.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(deliveryChan, nil) + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Subscriber().Return(mockSub) + + reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + + c := consumer.New(logger, scope, reg) + + handler := consumermock.NewMockController(ctrl) + setupController(handler, "test-handler", consumer.TopicRequest, "test-group", + func(ctx context.Context, delivery consumer.Delivery) error { return nil }, + ) err := c.Register(handler) require.NoError(t, err) @@ -641,20 +506,13 @@ func TestConsumer_AckNackLatencyTracking(t *testing.T) { err = c.Start(ctx) require.NoError(t, err) - // Send a test message msg := queue.NewMessage("msg-1", []byte("payload"), "partition1", nil) - delivery := &mockDelivery{ - msg: msg, - attempt: 1, - done: make(chan struct{}), - } - - deliveryChan <- delivery + mockDel := queuemock.NewMockDelivery(ctrl) + done := setupDelivery(mockDel, msg, nil, nil) - // Wait for processing to complete - <-delivery.done + deliveryChan <- mockDel + <-done - // Verify we have some timer metrics (latency tracking is working) snapshot := scope.Snapshot() assert.NotEmpty(t, snapshot.Timers(), "Should have timer metrics for latency tracking") assert.NotEmpty(t, snapshot.Counters(), "Should have counter metrics") @@ -664,26 +522,27 @@ func TestConsumer_AckNackLatencyTracking(t *testing.T) { } func TestConsumer_ErrorMetrics(t *testing.T) { + ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() scope := tally.NewTestScope("consumer", nil) deliveryChan := make(chan extqueue.Delivery, 1) - subscriber := &mockSubscriber{ - subscribeFunc: func(ctx context.Context, topic string, config extqueue.SubscriptionConfig) (<-chan extqueue.Delivery, error) { - return deliveryChan, nil - }, - } - q := &mockQueue{subscriber: subscriber} - c := New(logger, scope, q, "test-worker") - - handler := &mockController{ - name: "test-handler", - topic: "test-topic", - consumerGroup: "test-group", - processFunc: func(ctx context.Context, delivery Delivery) error { + mockSub := queuemock.NewMockSubscriber(ctrl) + mockSub.EXPECT().Subscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(deliveryChan, nil) + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Subscriber().Return(mockSub) + + reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + + c := consumer.New(logger, scope, reg) + + handler := consumermock.NewMockController(ctrl) + setupController(handler, "test-handler", consumer.TopicRequest, "test-group", + func(ctx context.Context, delivery consumer.Delivery) error { return fmt.Errorf("processing failed") }, - } + ) err := c.Register(handler) require.NoError(t, err) @@ -694,27 +553,16 @@ func TestConsumer_ErrorMetrics(t *testing.T) { err = c.Start(ctx) require.NoError(t, err) - // Send a test message msg := queue.NewMessage("msg-1", []byte("payload"), "partition1", nil) - delivery := &mockDelivery{ - msg: msg, - attempt: 1, - done: make(chan struct{}), - nackFunc: func(ctx context.Context, requeueAfterMillis int64) error { - return fmt.Errorf("nack failed") - }, - } - - deliveryChan <- delivery + mockDel := queuemock.NewMockDelivery(ctrl) + done := setupDelivery(mockDel, msg, nil, fmt.Errorf("nack failed")) - // Wait for processing to complete - <-delivery.done + deliveryChan <- mockDel + <-done - // Verify error metrics are tracked snapshot := scope.Snapshot() counters := snapshot.Counters() - // Should have handler_errors and nack_errors var hasErrorMetrics bool for _, counter := range counters { if strings.Contains(counter.Name(), "errors") { diff --git a/core/consumer/controller.go b/core/consumer/controller.go index 997ce33a..afafb9b5 100644 --- a/core/consumer/controller.go +++ b/core/consumer/controller.go @@ -1,5 +1,7 @@ package consumer +//go:generate mockgen -source=controller.go -destination=mock/controller_mock.go -package=mock + import ( "context" @@ -84,15 +86,10 @@ type Controller interface { Name() string // Topic returns the topic this controller subscribes to. - Topic() string + Topic() Topic // ConsumerGroup returns the consumer group for offset tracking. // Multiple controllers can share a consumer group to load-balance across workers. // Different consumer groups consume independently. ConsumerGroup() string - - // SubscriptionConfig returns the subscription config for this controller. - // Allows each controller to customize poll interval, batch size, timeouts, retry, DLQ. - // The subscriberName parameter is the unique worker identifier (hostname, pod name). - SubscriptionConfig(subscriberName string) extqueue.SubscriptionConfig } diff --git a/core/consumer/mock/BUILD.bazel b/core/consumer/mock/BUILD.bazel new file mode 100644 index 00000000..d03283fc --- /dev/null +++ b/core/consumer/mock/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "mock", + srcs = ["controller_mock.go"], + importpath = "github.com/uber/submitqueue/core/consumer/mock", + visibility = ["//visibility:public"], + deps = [ + "//core/consumer", + "//entity/queue", + "@org_uber_go_mock//gomock", + ], +) diff --git a/core/consumer/mock/controller_mock.go b/core/consumer/mock/controller_mock.go new file mode 100644 index 00000000..b9ed316b --- /dev/null +++ b/core/consumer/mock/controller_mock.go @@ -0,0 +1,207 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: consumer/controller.go +// +// Generated by this command: +// +// mockgen -source=consumer/controller.go -destination=consumer/mock/controller_mock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + consumer "github.com/uber/submitqueue/core/consumer" + queue "github.com/uber/submitqueue/entity/queue" + gomock "go.uber.org/mock/gomock" +) + +// MockDelivery is a mock of Delivery interface. +type MockDelivery struct { + ctrl *gomock.Controller + recorder *MockDeliveryMockRecorder + isgomock struct{} +} + +// MockDeliveryMockRecorder is the mock recorder for MockDelivery. +type MockDeliveryMockRecorder struct { + mock *MockDelivery +} + +// NewMockDelivery creates a new mock instance. +func NewMockDelivery(ctrl *gomock.Controller) *MockDelivery { + mock := &MockDelivery{ctrl: ctrl} + mock.recorder = &MockDeliveryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDelivery) EXPECT() *MockDeliveryMockRecorder { + return m.recorder +} + +// Attempt mocks base method. +func (m *MockDelivery) Attempt() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Attempt") + ret0, _ := ret[0].(int) + return ret0 +} + +// Attempt indicates an expected call of Attempt. +func (mr *MockDeliveryMockRecorder) Attempt() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Attempt", reflect.TypeOf((*MockDelivery)(nil).Attempt)) +} + +// DeliveryID mocks base method. +func (m *MockDelivery) DeliveryID() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeliveryID") + ret0, _ := ret[0].(string) + return ret0 +} + +// DeliveryID indicates an expected call of DeliveryID. +func (mr *MockDeliveryMockRecorder) DeliveryID() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeliveryID", reflect.TypeOf((*MockDelivery)(nil).DeliveryID)) +} + +// ExtendVisibilityTimeout mocks base method. +func (m *MockDelivery) ExtendVisibilityTimeout(ctx context.Context, durationMillis int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ExtendVisibilityTimeout", ctx, durationMillis) + ret0, _ := ret[0].(error) + return ret0 +} + +// ExtendVisibilityTimeout indicates an expected call of ExtendVisibilityTimeout. +func (mr *MockDeliveryMockRecorder) ExtendVisibilityTimeout(ctx, durationMillis any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExtendVisibilityTimeout", reflect.TypeOf((*MockDelivery)(nil).ExtendVisibilityTimeout), ctx, durationMillis) +} + +// Message mocks base method. +func (m *MockDelivery) Message() queue.Message { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Message") + ret0, _ := ret[0].(queue.Message) + return ret0 +} + +// Message indicates an expected call of Message. +func (mr *MockDeliveryMockRecorder) Message() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Message", reflect.TypeOf((*MockDelivery)(nil).Message)) +} + +// Metadata mocks base method. +func (m *MockDelivery) Metadata() map[string]string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Metadata") + ret0, _ := ret[0].(map[string]string) + return ret0 +} + +// Metadata indicates an expected call of Metadata. +func (mr *MockDeliveryMockRecorder) Metadata() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Metadata", reflect.TypeOf((*MockDelivery)(nil).Metadata)) +} + +// ReceivedAt mocks base method. +func (m *MockDelivery) ReceivedAt() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReceivedAt") + ret0, _ := ret[0].(int64) + return ret0 +} + +// ReceivedAt indicates an expected call of ReceivedAt. +func (mr *MockDeliveryMockRecorder) ReceivedAt() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceivedAt", reflect.TypeOf((*MockDelivery)(nil).ReceivedAt)) +} + +// MockController is a mock of Controller interface. +type MockController struct { + ctrl *gomock.Controller + recorder *MockControllerMockRecorder + isgomock struct{} +} + +// MockControllerMockRecorder is the mock recorder for MockController. +type MockControllerMockRecorder struct { + mock *MockController +} + +// NewMockController creates a new mock instance. +func NewMockController(ctrl *gomock.Controller) *MockController { + mock := &MockController{ctrl: ctrl} + mock.recorder = &MockControllerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockController) EXPECT() *MockControllerMockRecorder { + return m.recorder +} + +// ConsumerGroup mocks base method. +func (m *MockController) ConsumerGroup() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ConsumerGroup") + ret0, _ := ret[0].(string) + return ret0 +} + +// ConsumerGroup indicates an expected call of ConsumerGroup. +func (mr *MockControllerMockRecorder) ConsumerGroup() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConsumerGroup", reflect.TypeOf((*MockController)(nil).ConsumerGroup)) +} + +// Name mocks base method. +func (m *MockController) Name() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Name") + ret0, _ := ret[0].(string) + return ret0 +} + +// Name indicates an expected call of Name. +func (mr *MockControllerMockRecorder) Name() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockController)(nil).Name)) +} + +// Process mocks base method. +func (m *MockController) Process(ctx context.Context, delivery consumer.Delivery) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Process", ctx, delivery) + ret0, _ := ret[0].(error) + return ret0 +} + +// Process indicates an expected call of Process. +func (mr *MockControllerMockRecorder) Process(ctx, delivery any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Process", reflect.TypeOf((*MockController)(nil).Process), ctx, delivery) +} + +// Topic mocks base method. +func (m *MockController) Topic() consumer.Topic { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Topic") + ret0, _ := ret[0].(consumer.Topic) + return ret0 +} + +// Topic indicates an expected call of Topic. +func (mr *MockControllerMockRecorder) Topic() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Topic", reflect.TypeOf((*MockController)(nil).Topic)) +} diff --git a/core/consumer/registry.go b/core/consumer/registry.go new file mode 100644 index 00000000..af8535fb --- /dev/null +++ b/core/consumer/registry.go @@ -0,0 +1,80 @@ +package consumer + +import "github.com/uber/submitqueue/extension/queue" + +// Topic identifies a queue topic in the pipeline. +type Topic string + +const ( + // TopicRequest is where new requests arrive from the gateway. + TopicRequest Topic = "request" + // TopicToBatch is where validated requests are published for batching. + TopicToBatch Topic = "to-batch" +) + +// String returns the topic name as a string. +func (t Topic) String() string { + return string(t) +} + +// TopicConfig maps a topic to its queue backend. +type TopicConfig struct { + // Topic is the topic identifier. + Topic Topic + // Queue is the queue backend for this topic. + Queue queue.Queue +} + +// TopicRegistry provides queue and subscription config for topics. +// Each topic can have a different queue backend. +type TopicRegistry struct { + queues map[Topic]queue.Queue + subscriptionConfigs map[topicGroup]queue.SubscriptionConfig +} + +// topicGroup identifies a topic and consumer group pair. +type topicGroup struct { + topic Topic + consumerGroup string +} + +// NewTopicRegistry creates a new TopicRegistry. +// - topicConfigs: maps each topic to its queue backend +// - subscriptionConfigs: subscription configurations for each topic+consumerGroup +func NewTopicRegistry( + topicConfigs []TopicConfig, + subscriptionConfigs []queue.SubscriptionConfig, +) TopicRegistry { + queues := make(map[Topic]queue.Queue, len(topicConfigs)) + for _, tc := range topicConfigs { + queues[tc.Topic] = tc.Queue + } + + configs := make(map[topicGroup]queue.SubscriptionConfig, len(subscriptionConfigs)) + for _, cfg := range subscriptionConfigs { + key := topicGroup{ + topic: Topic(cfg.Topic), + consumerGroup: cfg.ConsumerGroup, + } + configs[key] = cfg + } + + return TopicRegistry{ + queues: queues, + subscriptionConfigs: configs, + } +} + +// Queue returns the queue backend for the given topic. +// Returns ok=false if no queue is registered for this topic. +func (r TopicRegistry) Queue(topic Topic) (queue.Queue, bool) { + q, ok := r.queues[topic] + return q, ok +} + +// SubscriptionConfig returns the subscription configuration for the given topic and consumer group. +// Returns ok=false if no configuration is registered. +func (r TopicRegistry) SubscriptionConfig(topic Topic, consumerGroup string) (queue.SubscriptionConfig, bool) { + cfg, ok := r.subscriptionConfigs[topicGroup{topic: topic, consumerGroup: consumerGroup}] + return cfg, ok +} diff --git a/core/consumer/registry_test.go b/core/consumer/registry_test.go new file mode 100644 index 00000000..1c0197da --- /dev/null +++ b/core/consumer/registry_test.go @@ -0,0 +1,194 @@ +package consumer_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/core/consumer" + extqueue "github.com/uber/submitqueue/extension/queue" + queuemock "github.com/uber/submitqueue/extension/queue/mock" + "go.uber.org/mock/gomock" +) + +func TestNewTopicRegistry(t *testing.T) { + tests := []struct { + name string + configs []extqueue.SubscriptionConfig + }{ + { + name: "with configs", + configs: []extqueue.SubscriptionConfig{ + extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a"), + extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-b"), + }, + }, + { + name: "nil configs", + configs: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockQ := queuemock.NewMockQueue(ctrl) + + registry := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Topic: consumer.TopicRequest, Queue: mockQ}}, + tt.configs, + ) + + q, ok := registry.Queue(consumer.TopicRequest) + require.True(t, ok) + assert.Equal(t, mockQ, q) + + if tt.configs == nil { + _, ok := registry.SubscriptionConfig(consumer.TopicRequest, "group-a") + assert.False(t, ok) + } + }) + } +} + +func TestTopicRegistry_SubscriptionConfig(t *testing.T) { + tests := []struct { + name string + configs []extqueue.SubscriptionConfig + lookupTopic consumer.Topic + lookupGroup string + expectFound bool + expectedGroup string + }{ + { + name: "found group-a", + configs: []extqueue.SubscriptionConfig{ + extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a"), + extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-b"), + }, + lookupTopic: consumer.TopicRequest, + lookupGroup: "group-a", + expectFound: true, + expectedGroup: "group-a", + }, + { + name: "found group-b", + configs: []extqueue.SubscriptionConfig{ + extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a"), + extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-b"), + }, + lookupTopic: consumer.TopicRequest, + lookupGroup: "group-b", + expectFound: true, + expectedGroup: "group-b", + }, + { + name: "not found by group", + configs: []extqueue.SubscriptionConfig{ + extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a"), + }, + lookupTopic: consumer.TopicRequest, + lookupGroup: "nonexistent", + expectFound: false, + }, + { + name: "not found by topic", + configs: []extqueue.SubscriptionConfig{ + extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a"), + }, + lookupTopic: consumer.Topic("other"), + lookupGroup: "group-a", + expectFound: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockQ := queuemock.NewMockQueue(ctrl) + + registry := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Topic: consumer.TopicRequest, Queue: mockQ}}, + tt.configs, + ) + config, ok := registry.SubscriptionConfig(tt.lookupTopic, tt.lookupGroup) + + if !tt.expectFound { + assert.False(t, ok) + } else { + require.True(t, ok) + assert.Equal(t, tt.expectedGroup, config.ConsumerGroup) + } + }) + } +} + +func TestTopicRegistry_DuplicateConfig_LastWins(t *testing.T) { + ctrl := gomock.NewController(t) + mockQ := queuemock.NewMockQueue(ctrl) + + config1 := extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a") + config1.BatchSize = 10 + + config2 := extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a") + config2.BatchSize = 50 + + registry := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Topic: consumer.TopicRequest, Queue: mockQ}}, + []extqueue.SubscriptionConfig{config1, config2}, + ) + + config, ok := registry.SubscriptionConfig(consumer.TopicRequest, "group-a") + require.True(t, ok) + assert.Equal(t, 50, config.BatchSize) +} + +func TestTopicRegistry_Queue_PerTopic(t *testing.T) { + ctrl := gomock.NewController(t) + mockQ1 := queuemock.NewMockQueue(ctrl) + mockQ2 := queuemock.NewMockQueue(ctrl) + + registry := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Topic: consumer.TopicRequest, Queue: mockQ1}, + {Topic: consumer.TopicToBatch, Queue: mockQ2}, + }, + nil, + ) + + q1, ok := registry.Queue(consumer.TopicRequest) + require.True(t, ok) + assert.Equal(t, mockQ1, q1) + + q2, ok := registry.Queue(consumer.TopicToBatch) + require.True(t, ok) + assert.Equal(t, mockQ2, q2) + + _, ok = registry.Queue(consumer.Topic("nonexistent")) + assert.False(t, ok) +} + +func TestTopic_String(t *testing.T) { + tests := []struct { + name string + topic consumer.Topic + expected string + }{ + { + name: "predefined topic", + topic: consumer.TopicRequest, + expected: "request", + }, + { + name: "custom topic", + topic: consumer.Topic("custom"), + expected: "custom", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.topic.String()) + }) + } +} diff --git a/doc/PROJECT_STRUCTURE.md b/doc/PROJECT_STRUCTURE.md deleted file mode 100644 index dfb33a06..00000000 --- a/doc/PROJECT_STRUCTURE.md +++ /dev/null @@ -1,178 +0,0 @@ -# Project Structure - -This document describes the organization of the SubmitQueue repository. - -## Directory Layout - -``` -submitqueue/ -├── .bazelversion # Pins Bazel version -├── .envrc # direnv configuration -├── .docker-bin/ # Linux binaries for Docker (gitignored) -├── MODULE.bazel # Bzlmod dependency management -├── go.mod # Go module dependencies -├── Makefile # Build automation -├── BUILD.bazel # Root build file -│ -├── tool/ # Bazel tooling (Bazelisk wrapper) -│ -├── gateway/ # Gateway service - entry point for external requests -│ ├── controller/ # Business logic (land, ping) -│ ├── proto/ # Proto definitions -│ └── protopb/ # Generated proto code (*.pb.go, *_grpc.pb.go, *.pb.yarpc.go) -│ -├── orchestrator/ # Orchestrator service - processes requests via queues -│ ├── controller/ # Business logic (request, ping) -│ ├── proto/ # Proto definitions -│ └── protopb/ # Generated proto code -│ -├── entity/ # Domain entities (Request, Change, enums) -│ └── queue/ # Queue-specific entities (Message) -│ -├── extension/ # Pluggable backend implementations -│ ├── counter/ # Sequential number generation interface -│ │ └── mysql/ # MySQL implementation + schema -│ │ -│ ├── queue/ # Messaging queue abstraction -│ │ └── sql/ # SQL (MySQL) implementation + schema -│ │ -│ └── storage/ # Storage abstraction -│ └── mysql/ # MySQL implementation + schema -│ -├── consumer/ # Reusable queue consumer infrastructure -│ # (Handler interface, Consumer) -│ -├── example/ # Runnable examples -│ ├── server/ # Server implementations with Docker Compose -│ │ ├── gateway/ # Gateway server + Dockerfile -│ │ └── orchestrator/ # Orchestrator server + Dockerfile -│ └── client/ # Client examples (gateway, orchestrator) -│ -├── test/ # All tests -│ ├── e2e/ # End-to-end tests (full stack) -│ ├── integration/ # Integration tests -│ │ ├── gateway/ # Gateway service tests -│ │ ├── orchestrator/ # Orchestrator service tests -│ │ └── extension/ # Extension implementation tests -│ │ ├── counter/mysql/ -│ │ ├── queue/sql/ -│ │ └── storage/mysql/ -│ └── testutil/ # Test utilities (Docker Compose, MySQL, servers) -│ -└── doc/ # Documentation - ├── CLAUDE.md # Development guidelines - ├── PROJECT_STRUCTURE.md # This file - ├── howto/ # How-to guides - │ └── TESTING.md # Testing guide - └── rfc/ # Design documents and proposals -``` - -## Key Design Principles - -### 1. Clean Architecture with Interface-Driven Extensions - -**Extensions** are pluggable, vendor-agnostic interfaces: -- `extension/{extension}/` - Interface definitions -- `extension/{extension}/{impl}/` - Implementations (e.g., `mysql/`) -- Each extension has its own schema files - -**Examples:** -- `extension/storage/` - Storage interface, MySQL implementation -- `extension/queue/` - Queue interface, SQL implementation -- `extension/counter/` - Counter interface, MySQL implementation - -### 2. Service Structure - -Each service follows a consistent layout: -- `controller/` - Pure business logic (transport-agnostic) -- `proto/` - Proto definitions (`.proto` files) -- `protopb/` - Generated proto code (committed to repo) - -**Controllers** contain pure business logic, independent of gRPC/YARPC transport layer. - -### 3. Separate `proto/` and `protopb/` Directories - -Each service has: -- `proto/` - Contains the `.proto` file(s) -- `protopb/` - Contains all generated files (`.pb.go`, `_grpc.pb.go`, `.pb.yarpc.go`) - -This separation makes it clear what is source vs. generated. **All generated files are committed** to the repository. - -### 4. YARPC Support - -All proto files generate three types of files: -- `*.pb.go` - Standard protobuf code -- `*_grpc.pb.go` - gRPC service code -- `*.pb.yarpc.go` - YARPC service code for Uber's RPC framework - -This allows services to support both gRPC and YARPC clients. - -### 5. Entity-Driven Design - -Domain entities live in `entity/`: -- Pure, framework-agnostic value types -- Use `int64` for timestamps (Unix milliseconds) -- Reference other entities by ID, not directly -- String enums with clear names - -### 6. Consumer Infrastructure - -The `consumer/` package provides reusable queue consumer infrastructure: -- `Handler` interface - Business logic for processing messages -- `Manager` - Orchestrates multiple consumers across different topics -- Services register handlers and the manager handles subscriptions, polling, ack/nack - -### 7. Docker-Based Testing - -All integration and e2e tests use Docker Compose: -- Tests in `test/integration/` for services and extensions -- Tests in `test/e2e/` for full stack -- `test/testutil/` provides Docker Compose helpers -- Hermetic, parallel-safe, auto cleanup - -### 8. Python-Based Bazel Wrapper - -The `tool/bazel` script is a Python implementation of Bazelisk that: -- Reads `.bazelversion` to determine which Bazel version to use -- Downloads and caches the appropriate Bazel binary -- Delegates to the correct version automatically - -### 9. Two Separate Databases - -SubmitQueue demonstrates proper architectural separation: -- **Application DB** (port 3306) - Business data (requests, counters) -- **Queue DB** (port 3307) - Messaging infrastructure (messages, offsets, leases) - -This allows the queue to be swapped for other technologies (Kafka, SQS, etc.) in production. - -## Build System - -- **Bazel with Bzlmod** (NOT WORKSPACE) for dependency management -- **Version pinning**: `.bazelversion` pins the Bazel version -- **Go version**: Defined in `go.mod`, read by `MODULE.bazel` via `go_sdk.from_file()` -- **External dependencies**: Must be added to both `go.mod` AND `MODULE.bazel` -- **BUILD files**: Every Go package must have a `BUILD.bazel` file - -## Services - -### Gateway -Entry point for external requests. Receives land requests, stores in DB, publishes to queue. - -### Orchestrator -The engine that processes requests through a multi-stage pipeline via queues. - -## Testing - -See [howto/TESTING.md](howto/TESTING.md) for comprehensive testing guide. - -**Quick overview:** -- **Unit tests** - Co-located with code, fast, no Docker -- **Integration tests** - `test/integration/`, Docker-based, hermetic -- **E2E tests** - `test/e2e/`, full stack, Docker-based - -All automated tests use Docker Compose with unique container prefixes for parallel execution. - -## See Also - -- [CLAUDE.md](../CLAUDE.md) - Development guidelines and coding conventions -- [howto/TESTING.md](howto/TESTING.md) - Comprehensive testing documentation diff --git a/doc/howto/TESTING.md b/doc/howto/TESTING.md index bea064e8..a430da68 100644 --- a/doc/howto/TESTING.md +++ b/doc/howto/TESTING.md @@ -86,11 +86,12 @@ make build-all-linux # Build Linux binaries for Docker ### How Automated Tests Work -Tests use **docker-compose** to spin up containers automatically: +Tests use **docker-compose** via `ComposeStack` to spin up containers automatically: -1. `SetupSuite()` - Creates MySQL + service containers **once** per test suite -2. All tests run against those containers -3. `TearDownSuite()` - Cleans up containers automatically +1. `NewComposeStack()` registers cleanup (stop log tailing, tear down containers) +2. `Up()` starts containers, waits for healthchecks (`--wait`), and auto-tails container logs to stderr +3. Tests run against those containers with **real-time log output** +4. On cleanup, containers are torn down automatically (set `SKIP_CLEANUP=true` to keep them for inspection) --- @@ -167,6 +168,10 @@ sq-test-e2e-def456-orchestrator-service-1 ### Debugging with Container Names +Container logs are **automatically streamed to stderr** during test runs, so you'll see service output (startup messages, errors, zap logs) in real time — both locally and in CI. + +For additional manual inspection: + ```bash # See what tests are currently running docker ps --format "table {{.Names}}\t{{.Status}}" | grep sq-test @@ -174,9 +179,6 @@ docker ps --format "table {{.Names}}\t{{.Status}}" | grep sq-test # Find all containers from gateway test docker ps | grep sq-test-gateway -# View logs from specific test container -docker logs sq-test-gateway-abc123-mysql-app-1 - # Inspect a specific test's MySQL docker exec -it sq-test-ext-counter-2ce1d0-mysql-1 \ mysql -uroot -proot submitqueue -e "SHOW TABLES;" @@ -353,6 +355,9 @@ make integration-test ``` ### Containers Not Cleaning Up + +Containers are torn down automatically after each test. Set `SKIP_CLEANUP=true` to keep them for inspection. + ```bash # List all test containers docker ps -a | grep sq-test @@ -400,8 +405,7 @@ assert.Equal(s.T(), "expected", resp.Value) ## See Also -- [PROJECT_STRUCTURE.md](PROJECT_STRUCTURE.md) - Project organization -- [CLAUDE.md](../../CLAUDE.md) - Development guidelines +- [CLAUDE.md](../../CLAUDE.md) - Development guidelines and project structure - [example/server/docker-compose.yml](../../example/server/docker-compose.yml) - Full stack service definitions - [example/server/gateway/docker-compose.yml](../../example/server/gateway/docker-compose.yml) - Gateway isolation - [example/server/orchestrator/docker-compose.yml](../../example/server/orchestrator/docker-compose.yml) - Orchestrator isolation diff --git a/example/server/gateway/BUILD.bazel b/example/server/gateway/BUILD.bazel index 8e40705b..bf84c820 100644 --- a/example/server/gateway/BUILD.bazel +++ b/example/server/gateway/BUILD.bazel @@ -11,10 +11,9 @@ go_library( importpath = "github.com/uber/submitqueue/example/server/gateway", visibility = ["//visibility:private"], deps = [ + "//core/consumer", "//extension/counter/mysql", - "//extension/queue", "//extension/queue/sql", - "//extension/storage", "//extension/storage/mysql", "//gateway/controller", "//gateway/protopb", diff --git a/example/server/gateway/main.go b/example/server/gateway/main.go index 78d27a5b..ef18ef71 100644 --- a/example/server/gateway/main.go +++ b/example/server/gateway/main.go @@ -13,10 +13,9 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" - "github.com/uber/submitqueue/extension/queue" queueSQL "github.com/uber/submitqueue/extension/queue/sql" - "github.com/uber/submitqueue/extension/storage" "github.com/uber/submitqueue/extension/storage/mysql" "github.com/uber/submitqueue/gateway/controller" pb "github.com/uber/submitqueue/gateway/protopb" @@ -92,103 +91,58 @@ func run() error { metricsWgDone.Wait() }() - // Initialize MySQL storage (with retries for MySQL startup) - mysqlDSN := os.Getenv("MYSQL_DSN") - if mysqlDSN == "" { - mysqlDSN = "root:root@tcp(localhost:3306)/submitqueue?parseTime=true" - } - var store storage.Storage - maxRetries := 10 - retryInterval := time.Second - for i := 0; i < maxRetries; i++ { - store, err = mysql.NewStorage(mysql.MySQLParameters{ - DSN: mysqlDSN, - MaxOpenConns: 10, - MaxIdleConns: 5, - ConnMaxLifetime: 5 * time.Minute, - }) - if err == nil { - break - } - if i < maxRetries-1 { - logger.Warn("failed to create MySQL storage, retrying...", - zap.Int("attempt", i+1), - zap.Int("max_retries", maxRetries), - zap.Error(err), - ) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(retryInterval): - } - } + // Open application database connection + // Docker Compose healthchecks ensure MySQL is ready before service starts + appDSN := os.Getenv("MYSQL_DSN") + if appDSN == "" { + return fmt.Errorf("MYSQL_DSN environment variable is required") } + appDB, err := sql.Open("mysql", appDSN) if err != nil { - return fmt.Errorf("failed to create MySQL storage after %d retries: %w", maxRetries, err) + return fmt.Errorf("failed to open app database: %w", err) } - defer store.Close() + defer appDB.Close() - // Initialize MySQL counter - counterDB, err := sql.Open("mysql", mysqlDSN) + // Initialize storage and counter from shared app database connection + store, err := mysql.NewStorage(appDB) if err != nil { - return fmt.Errorf("failed to open MySQL connection for counter: %w", err) + return fmt.Errorf("failed to create storage: %w", err) } - defer counterDB.Close() - cnt := mysqlcounter.NewCounter(counterDB) + cnt := mysqlcounter.NewCounter(appDB) - // Initialize queue + // Open queue database connection queueDSN := os.Getenv("QUEUE_MYSQL_DSN") if queueDSN == "" { return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required") } - - // Create gRPC server - grpcServer := grpc.NewServer() - - // Create controllers and wrap them for gRPC - pingController := controller.NewPingController(logger, scope) - queueDB, err := sql.Open("mysql", queueDSN) if err != nil { - return fmt.Errorf("failed to open MySQL connection for queue: %w", err) + return fmt.Errorf("failed to open queue database: %w", err) } defer queueDB.Close() - // Retry queue creation (with retries for MySQL startup) - var q queue.Queue - maxRetries = 10 - retryInterval = time.Second - for i := 0; i < maxRetries; i++ { - q, err = queueSQL.NewQueue(queueSQL.Params{ - DB: queueDB, - Logger: logger, - MetricsScope: scope.SubScope("queue"), - }) - if err == nil { - break - } - if i < maxRetries-1 { - logger.Warn("failed to create queue, retrying...", - zap.Int("attempt", i+1), - zap.Int("max_retries", maxRetries), - zap.Error(err), - ) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(retryInterval): - } - } - } + // Initialize queue + sqlQueue, err := queueSQL.NewQueue(queueSQL.Params{ + DB: queueDB, + Logger: logger, + MetricsScope: scope.SubScope("queue"), + }) if err != nil { - return fmt.Errorf("failed to create queue after %d retries: %w", maxRetries, err) + return fmt.Errorf("failed to create queue: %w", err) } - defer q.Close() + defer sqlQueue.Close() + + logger.Info("initialized dependencies", + zap.String("app_dsn", appDSN), + zap.String("queue_dsn", queueDSN), + ) - logger.Info("queue initialized", zap.String("dsn", queueDSN)) + // Create gRPC server + grpcServer := grpc.NewServer() - // Land controller requires queue publisher - landController := controller.NewLandController(logger.Sugar(), scope, store, cnt, q.Publisher()) + // Create controllers and wrap them for gRPC + pingController := controller.NewPingController(logger, scope) + landController := controller.NewLandController(logger.Sugar(), scope, store, cnt, sqlQueue.Publisher(), consumer.TopicRequest.String()) gatewayServer := &GatewayServer{ pingController: pingController, landController: landController, diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 2b41534a..84e45bad 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -14,7 +14,7 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/extension/queue" + extqueue "github.com/uber/submitqueue/extension/queue" queueSQL "github.com/uber/submitqueue/extension/queue/sql" "github.com/uber/submitqueue/orchestrator/controller" "github.com/uber/submitqueue/orchestrator/controller/request" @@ -85,75 +85,74 @@ func run() error { metricsWgDone.Wait() }() - // Initialize queue (optional - only if QUEUE_MYSQL_DSN is provided) - // This allows the server to start without queue infrastructure for basic testing + // Open queue database connection + // Docker Compose healthchecks ensure MySQL is ready before service starts queueDSN := os.Getenv("QUEUE_MYSQL_DSN") - var c consumer.Consumer - if queueDSN != "" { - queueDB, err := sql.Open("mysql", queueDSN) - if err != nil { - return fmt.Errorf("failed to open MySQL connection for queue: %w", err) - } - defer queueDB.Close() - - // Retry queue creation (with retries for MySQL startup) - var q queue.Queue - maxRetries := 10 - retryInterval := time.Second - for i := 0; i < maxRetries; i++ { - q, err = queueSQL.NewQueue(queueSQL.Params{ - DB: queueDB, - Logger: logger, - MetricsScope: scope.SubScope("queue"), - }) - if err == nil { - break - } - if i < maxRetries-1 { - logger.Warn("failed to create queue, retrying...", - zap.Int("attempt", i+1), - zap.Int("max_retries", maxRetries), - zap.Error(err), - ) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(retryInterval): - } - } - } - if err != nil { - return fmt.Errorf("failed to create queue after %d retries: %w", maxRetries, err) - } - defer q.Close() - - logger.Info("queue initialized", zap.String("dsn", queueDSN)) - - // Create consumer - subscriberName := os.Getenv("HOSTNAME") - if subscriberName == "" { - subscriberName = fmt.Sprintf("orchestrator-%d", time.Now().Unix()) - } + if queueDSN == "" { + return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required") + } + queueDB, err := sql.Open("mysql", queueDSN) + if err != nil { + return fmt.Errorf("failed to open queue database: %w", err) + } + defer queueDB.Close() + + // Initialize queue + sqlQueue, err := queueSQL.NewQueue(queueSQL.Params{ + DB: queueDB, + Logger: logger, + MetricsScope: scope.SubScope("queue"), + }) + if err != nil { + return fmt.Errorf("failed to create queue: %w", err) + } + defer sqlQueue.Close() - c = consumer.New(logger.Sugar(), scope.SubScope("consumer"), q, subscriberName) + logger.Info("initialized queue", zap.String("dsn", queueDSN)) - // Register handlers for the pipeline - requestHandler := request.NewController(logger.Sugar(), scope) - if err := c.Register(requestHandler); err != nil { - return fmt.Errorf("failed to register request handler: %w", err) - } + // Create topic registry + subscriberName := os.Getenv("HOSTNAME") + if subscriberName == "" { + subscriberName = fmt.Sprintf("orchestrator-%d", time.Now().Unix()) + } - logger.Info("handlers registered", zap.Int("count", 1)) + registry := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Topic: consumer.TopicRequest, Queue: sqlQueue}, + {Topic: consumer.TopicToBatch, Queue: sqlQueue}, + }, + []extqueue.SubscriptionConfig{ + extqueue.DefaultSubscriptionConfig( + consumer.TopicRequest.String(), + subscriberName, + "orchestrator-request", + ), + }, + ) + + // Create consumer + c := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry) + + // Register request controller + // Pipeline: request → batch → speculation → build → merge + requestController := request.NewController( + logger.Sugar(), + scope, + registry, + consumer.TopicRequest, + "orchestrator-request", + ) + if err := c.Register(requestController); err != nil { + return fmt.Errorf("failed to register request controller: %w", err) + } - // Start consumers - if err := c.Start(ctx); err != nil { - return fmt.Errorf("failed to start consumers: %w", err) - } + logger.Info("controllers registered", zap.Int("count", 1)) - logger.Info("consumer started") - } else { - logger.Warn("queue infrastructure disabled (QUEUE_MYSQL_DSN not set)") + // Start consumers + if err := c.Start(ctx); err != nil { + return fmt.Errorf("failed to start consumers: %w", err) } + logger.Info("consumer started") // Create gRPC server grpcServer := grpc.NewServer() @@ -191,22 +190,14 @@ func run() error { select { case <-ctx.Done(): fmt.Println("\nShutting down orchestrator server...") - if c != nil { - if err := c.Stop(30000); err != nil { - logger.Error("consumer stop error", zap.Error(err)) - } - } + c.Stop(30000) // Stop consumers with 30s timeout grpcServer.GracefulStop() _ = <-serverErrCh // Wait for the server to exit and ignore the error case errCh := <-serverErrCh: if errCh != nil { err = fmt.Errorf("\nServer exited with error: %w\n", errCh) } - if c != nil { - if stopErr := c.Stop(30000); stopErr != nil { - logger.Error("consumer stop error", zap.Error(stopErr)) - } - } + c.Stop(30000) // Stop consumers with 30s timeout } return err diff --git a/extension/queue/mock/BUILD.bazel b/extension/queue/mock/BUILD.bazel index 787b3047..0bc7d1c2 100644 --- a/extension/queue/mock/BUILD.bazel +++ b/extension/queue/mock/BUILD.bazel @@ -2,11 +2,17 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "mock", - srcs = ["delivery.go"], + srcs = [ + "delivery.go", + "publisher.go", + "queue.go", + "subscriber.go", + ], importpath = "github.com/uber/submitqueue/extension/queue/mock", visibility = ["//visibility:public"], deps = [ "//entity/queue", + "//extension/queue", "@org_uber_go_mock//gomock", ], ) diff --git a/extension/queue/mock/publisher.go b/extension/queue/mock/publisher.go new file mode 100644 index 00000000..5c32d682 --- /dev/null +++ b/extension/queue/mock/publisher.go @@ -0,0 +1,70 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: extension/queue/publisher.go +// +// Generated by this command: +// +// mockgen -source=extension/queue/publisher.go -destination=extension/queue/mock/publisher.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + queue "github.com/uber/submitqueue/entity/queue" + gomock "go.uber.org/mock/gomock" +) + +// MockPublisher is a mock of Publisher interface. +type MockPublisher struct { + ctrl *gomock.Controller + recorder *MockPublisherMockRecorder + isgomock struct{} +} + +// MockPublisherMockRecorder is the mock recorder for MockPublisher. +type MockPublisherMockRecorder struct { + mock *MockPublisher +} + +// NewMockPublisher creates a new mock instance. +func NewMockPublisher(ctrl *gomock.Controller) *MockPublisher { + mock := &MockPublisher{ctrl: ctrl} + mock.recorder = &MockPublisherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPublisher) EXPECT() *MockPublisherMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockPublisher) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockPublisherMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockPublisher)(nil).Close)) +} + +// Publish mocks base method. +func (m *MockPublisher) Publish(ctx context.Context, topic string, message queue.Message) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Publish", ctx, topic, message) + ret0, _ := ret[0].(error) + return ret0 +} + +// Publish indicates an expected call of Publish. +func (mr *MockPublisherMockRecorder) Publish(ctx, topic, message any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockPublisher)(nil).Publish), ctx, topic, message) +} diff --git a/extension/queue/mock/queue.go b/extension/queue/mock/queue.go new file mode 100644 index 00000000..40afed2e --- /dev/null +++ b/extension/queue/mock/queue.go @@ -0,0 +1,83 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: extension/queue/queue.go +// +// Generated by this command: +// +// mockgen -source=extension/queue/queue.go -destination=extension/queue/mock/queue.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + queue "github.com/uber/submitqueue/extension/queue" + gomock "go.uber.org/mock/gomock" +) + +// MockQueue is a mock of Queue interface. +type MockQueue struct { + ctrl *gomock.Controller + recorder *MockQueueMockRecorder + isgomock struct{} +} + +// MockQueueMockRecorder is the mock recorder for MockQueue. +type MockQueueMockRecorder struct { + mock *MockQueue +} + +// NewMockQueue creates a new mock instance. +func NewMockQueue(ctrl *gomock.Controller) *MockQueue { + mock := &MockQueue{ctrl: ctrl} + mock.recorder = &MockQueueMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockQueue) EXPECT() *MockQueueMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockQueue) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockQueueMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockQueue)(nil).Close)) +} + +// Publisher mocks base method. +func (m *MockQueue) Publisher() queue.Publisher { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Publisher") + ret0, _ := ret[0].(queue.Publisher) + return ret0 +} + +// Publisher indicates an expected call of Publisher. +func (mr *MockQueueMockRecorder) Publisher() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publisher", reflect.TypeOf((*MockQueue)(nil).Publisher)) +} + +// Subscriber mocks base method. +func (m *MockQueue) Subscriber() queue.Subscriber { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Subscriber") + ret0, _ := ret[0].(queue.Subscriber) + return ret0 +} + +// Subscriber indicates an expected call of Subscriber. +func (mr *MockQueueMockRecorder) Subscriber() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscriber", reflect.TypeOf((*MockQueue)(nil).Subscriber)) +} diff --git a/extension/queue/mock/subscriber.go b/extension/queue/mock/subscriber.go new file mode 100644 index 00000000..3cb7c6b9 --- /dev/null +++ b/extension/queue/mock/subscriber.go @@ -0,0 +1,71 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: extension/queue/subscriber.go +// +// Generated by this command: +// +// mockgen -source=extension/queue/subscriber.go -destination=extension/queue/mock/subscriber.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + queue "github.com/uber/submitqueue/extension/queue" + gomock "go.uber.org/mock/gomock" +) + +// MockSubscriber is a mock of Subscriber interface. +type MockSubscriber struct { + ctrl *gomock.Controller + recorder *MockSubscriberMockRecorder + isgomock struct{} +} + +// MockSubscriberMockRecorder is the mock recorder for MockSubscriber. +type MockSubscriberMockRecorder struct { + mock *MockSubscriber +} + +// NewMockSubscriber creates a new mock instance. +func NewMockSubscriber(ctrl *gomock.Controller) *MockSubscriber { + mock := &MockSubscriber{ctrl: ctrl} + mock.recorder = &MockSubscriberMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSubscriber) EXPECT() *MockSubscriberMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockSubscriber) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockSubscriberMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSubscriber)(nil).Close)) +} + +// Subscribe mocks base method. +func (m *MockSubscriber) Subscribe(ctx context.Context, topic string, config queue.SubscriptionConfig) (<-chan queue.Delivery, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Subscribe", ctx, topic, config) + ret0, _ := ret[0].(<-chan queue.Delivery) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Subscribe indicates an expected call of Subscribe. +func (mr *MockSubscriberMockRecorder) Subscribe(ctx, topic, config any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockSubscriber)(nil).Subscribe), ctx, topic, config) +} diff --git a/extension/queue/publisher.go b/extension/queue/publisher.go index c77adbc9..4241f452 100644 --- a/extension/queue/publisher.go +++ b/extension/queue/publisher.go @@ -1,5 +1,7 @@ package queue +//go:generate mockgen -source=publisher.go -destination=mock/publisher.go -package=mock + import ( "context" diff --git a/extension/queue/queue.go b/extension/queue/queue.go index 4a327e39..45da4bc1 100644 --- a/extension/queue/queue.go +++ b/extension/queue/queue.go @@ -1,5 +1,7 @@ package queue +//go:generate mockgen -source=queue.go -destination=mock/queue.go -package=mock + // Queue creates and manages queue publishers and subscribers. // Implementations handle connection pooling, consumer group configuration, // and resource lifecycle. diff --git a/extension/queue/sql/subscriber_test.go b/extension/queue/sql/subscriber_test.go index 5d81107e..577d5d71 100644 --- a/extension/queue/sql/subscriber_test.go +++ b/extension/queue/sql/subscriber_test.go @@ -16,7 +16,7 @@ import ( ) func testSubscriptionConfig() extqueue.SubscriptionConfig { - return extqueue.DefaultSubscriptionConfig("test-subscriber", "test-consumer") + return extqueue.DefaultSubscriptionConfig("test-topic", "test-subscriber", "test-consumer") } func setupSubscriberTest(t *testing.T, mockMessageStore *MockmessageStore, mockOffsetStore *MockoffsetStore, mockLeaseStore *MockpartitionLeaseStore) extqueue.Subscriber { diff --git a/extension/queue/subscriber.go b/extension/queue/subscriber.go index 5c294bdc..4442660b 100644 --- a/extension/queue/subscriber.go +++ b/extension/queue/subscriber.go @@ -1,5 +1,7 @@ package queue +//go:generate mockgen -source=subscriber.go -destination=mock/subscriber.go -package=mock + import ( "context" ) diff --git a/extension/queue/subscription_config.go b/extension/queue/subscription_config.go index 89651e1e..9185aec0 100644 --- a/extension/queue/subscription_config.go +++ b/extension/queue/subscription_config.go @@ -4,6 +4,9 @@ package queue // Each subscription (topic) can have its own settings for polling, // batching, retries, and dead letter queue behavior. type SubscriptionConfig struct { + // Topic is the queue topic name to subscribe to. + Topic string + // SubscriberName uniquely identifies this subscriber instance for partition leases. // Different workers should use different names (e.g., hostname, pod name, UUID). // Combined with ConsumerGroup, this determines which worker owns a partition lease. @@ -65,8 +68,9 @@ type DLQConfig struct { } // DefaultSubscriptionConfig returns a SubscriptionConfig with sensible defaults. -func DefaultSubscriptionConfig(subscriberName, consumerGroup string) SubscriptionConfig { +func DefaultSubscriptionConfig(topic, subscriberName, consumerGroup string) SubscriptionConfig { return SubscriptionConfig{ + Topic: topic, SubscriberName: subscriberName, ConsumerGroup: consumerGroup, PollIntervalMs: 100, // 100ms diff --git a/extension/queue/subscription_config_test.go b/extension/queue/subscription_config_test.go index 59fe935e..48b93ff0 100644 --- a/extension/queue/subscription_config_test.go +++ b/extension/queue/subscription_config_test.go @@ -8,12 +8,14 @@ import ( ) func TestDefaultSubscriptionConfig(t *testing.T) { + topic := "test-topic" subscriberName := "test-worker" consumerGroup := "test-consumer" - config := DefaultSubscriptionConfig(subscriberName, consumerGroup) + config := DefaultSubscriptionConfig(topic, subscriberName, consumerGroup) // Verify required fields are set + assert.Equal(t, topic, config.Topic) assert.Equal(t, subscriberName, config.SubscriberName) assert.Equal(t, consumerGroup, config.ConsumerGroup) @@ -37,8 +39,8 @@ func TestDefaultSubscriptionConfig(t *testing.T) { func TestSubscriptionConfig_FieldsAreIndependent(t *testing.T) { // Create two configs and modify one to ensure they're independent - config1 := DefaultSubscriptionConfig("worker-1", "consumer-1") - config2 := DefaultSubscriptionConfig("worker-2", "consumer-2") + config1 := DefaultSubscriptionConfig("topic-1", "worker-1", "consumer-1") + config2 := DefaultSubscriptionConfig("topic-2", "worker-2", "consumer-2") // Modify config1 config1.PollIntervalMs = 500 @@ -56,7 +58,7 @@ func TestSubscriptionConfig_FieldsAreIndependent(t *testing.T) { } func TestSubscriptionConfig_CustomValues(t *testing.T) { - config := DefaultSubscriptionConfig("my-worker", "my-consumer") + config := DefaultSubscriptionConfig("my-topic", "my-worker", "my-consumer") // Override with custom values (in milliseconds) config.PollIntervalMs = 200 @@ -109,6 +111,7 @@ func TestSubscriptionConfig_ZeroValues(t *testing.T) { // Test that zero-value SubscriptionConfig can be created var config SubscriptionConfig + assert.Equal(t, "", config.Topic) assert.Equal(t, "", config.SubscriberName) assert.Equal(t, "", config.ConsumerGroup) assert.Equal(t, int64(0), config.PollIntervalMs) @@ -123,17 +126,19 @@ func TestSubscriptionConfig_ZeroValues(t *testing.T) { func TestSubscriptionConfig_DifferentConsumerGroups(t *testing.T) { // Test that different consumer groups get independent configs tests := []struct { + topic string subscriberName string consumerGroup string }{ - {"worker-1", "group-A"}, - {"worker-1", "group-B"}, - {"worker-2", "group-A"}, + {"topic-A", "worker-1", "group-A"}, + {"topic-B", "worker-1", "group-B"}, + {"topic-A", "worker-2", "group-A"}, } for _, tt := range tests { - t.Run(tt.subscriberName+"_"+tt.consumerGroup, func(t *testing.T) { - config := DefaultSubscriptionConfig(tt.subscriberName, tt.consumerGroup) + t.Run(tt.topic+"_"+tt.subscriberName+"_"+tt.consumerGroup, func(t *testing.T) { + config := DefaultSubscriptionConfig(tt.topic, tt.subscriberName, tt.consumerGroup) + require.Equal(t, tt.topic, config.Topic) require.Equal(t, tt.subscriberName, config.SubscriberName) require.Equal(t, tt.consumerGroup, config.ConsumerGroup) }) diff --git a/extension/storage/mysql/storage.go b/extension/storage/mysql/storage.go index 618a7486..12421e87 100644 --- a/extension/storage/mysql/storage.go +++ b/extension/storage/mysql/storage.go @@ -2,31 +2,12 @@ package mysql import ( "database/sql" - "fmt" - "time" _ "github.com/go-sql-driver/mysql" "github.com/uber/submitqueue/extension/storage" ) -// MySQLParameters defines the configuration for the MySQL storage. -// TODO: integrate with configuration system. -type MySQLParameters struct { - // DSN is the MySQL Data Source Name. - // Format: [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] - DSN string - - // MaxOpenConns sets the maximum number of open connections to the database. 0 means unlimited. - MaxOpenConns int - - // MaxIdleConns sets the maximum number of idle connections in the pool. 0 means no idle connections are retained. - MaxIdleConns int - - // ConnMaxLifetime sets the maximum amount of time a connection may be reused. 0 means connections are not closed due to age. - ConnMaxLifetime time.Duration -} - type mysqlStorage struct { db *sql.DB requestStore storage.RequestStore @@ -34,22 +15,7 @@ type mysqlStorage struct { } // NewStorage creates a new MySQL storage. -func NewStorage(p MySQLParameters) (storage.Storage, error) { - db, err := sql.Open("mysql", p.DSN) - if err != nil { - return nil, fmt.Errorf("failed to open MySQL connection: %w", err) - } - - if p.MaxOpenConns > 0 { - db.SetMaxOpenConns(p.MaxOpenConns) - } - if p.MaxIdleConns > 0 { - db.SetMaxIdleConns(p.MaxIdleConns) - } - if p.ConnMaxLifetime > 0 { - db.SetConnMaxLifetime(p.ConnMaxLifetime) - } - +func NewStorage(db *sql.DB) (storage.Storage, error) { return &mysqlStorage{ db: db, requestStore: NewRequestStore(db), diff --git a/gateway/controller/land.go b/gateway/controller/land.go index d2b59718..27fe58e6 100644 --- a/gateway/controller/land.go +++ b/gateway/controller/land.go @@ -32,16 +32,19 @@ type LandController struct { store storage.Storage counter counter.Counter publisher extqueue.Publisher + topic string // Topic to publish requests to (e.g., "request", "land_request") } // NewLandController creates a new instance of the gateway land controller. -func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, counter counter.Counter, publisher extqueue.Publisher) *LandController { +// topic: the queue topic to publish requests to (e.g., "request", "land_request") +func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, counter counter.Counter, publisher extqueue.Publisher, topic string) *LandController { return &LandController{ logger: logger, metricsScope: scope, store: store, counter: counter, publisher: publisher, + topic: topic, } } @@ -119,7 +122,7 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan c.logger.Infow("request published to queue", "queue", req.Queue, "sqid", request.ID, - "topic", "request", + "topic", c.topic, ) c.metricsScope.Counter("publish_success").Inc(1) @@ -143,7 +146,7 @@ func (c *LandController) publishToQueue(ctx context.Context, request entity.Requ msg := queue.NewMessage(request.ID, payload, request.Queue, nil) // Publish to request topic - if err := c.publisher.Publish(ctx, "request", msg); err != nil { + if err := c.publisher.Publish(ctx, c.topic, msg); err != nil { return fmt.Errorf("failed to publish message: %w", err) } diff --git a/gateway/controller/land_test.go b/gateway/controller/land_test.go index 1140bc00..c724a5a1 100644 --- a/gateway/controller/land_test.go +++ b/gateway/controller/land_test.go @@ -103,7 +103,7 @@ func TestNewLandController(t *testing.T) { cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 1, nil }} - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher()) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(), "request") require.NotNil(t, controller) } @@ -123,7 +123,7 @@ func TestLand_ReturnsSqid(t *testing.T) { cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 1, nil }} - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher()) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -155,7 +155,7 @@ func TestLand_PassesCorrectParametersToStore(t *testing.T) { cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 42, nil }} - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher()) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -192,7 +192,7 @@ func TestLand_ReturnsErrorOnStorageFailure(t *testing.T) { cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 1, nil }} - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher()) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -220,7 +220,7 @@ func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) { cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 0, fmt.Errorf("counter unavailable") }} - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher()) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -251,7 +251,7 @@ func TestLand_CounterDomainIncludesQueue(t *testing.T) { capturedDomain = domain return 1, nil }} - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher()) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -273,7 +273,7 @@ func TestLand_ReturnsErrorOnEmptyQueue(t *testing.T) { cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 1, nil }} - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher()) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -295,7 +295,7 @@ func TestLand_ReturnsErrorOnEmptyChangeSource(t *testing.T) { cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 1, nil }} - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher()) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -317,7 +317,7 @@ func TestLand_ReturnsErrorOnNilChange(t *testing.T) { cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 1, nil }} - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher()) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -339,7 +339,7 @@ func TestLand_ReturnsErrorOnEmptyChangeIDs(t *testing.T) { cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 1, nil }} - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher()) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -370,7 +370,7 @@ func TestLand_PublishesToQueue(t *testing.T) { return nil }} - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, publisher) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, publisher, "request") ctx := context.Background() req := &pb.LandRequest{ @@ -413,7 +413,7 @@ func TestLand_ContinuesWhenPublishFails(t *testing.T) { return fmt.Errorf("queue unavailable") }} - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, publisher) + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, publisher, "request") ctx := context.Background() req := &pb.LandRequest{ diff --git a/orchestrator/controller/request/BUILD.bazel b/orchestrator/controller/request/BUILD.bazel index 7f04bc5e..d737d595 100644 --- a/orchestrator/controller/request/BUILD.bazel +++ b/orchestrator/controller/request/BUILD.bazel @@ -8,7 +8,7 @@ go_library( deps = [ "//core/consumer", "//entity", - "//extension/queue", + "//entity/queue", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -22,7 +22,6 @@ go_test( "//core/consumer", "//entity", "//entity/queue", - "//extension/queue", "//extension/queue/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/orchestrator/controller/request/request.go b/orchestrator/controller/request/request.go index 339b8975..3b6f9a9e 100644 --- a/orchestrator/controller/request/request.go +++ b/orchestrator/controller/request/request.go @@ -7,16 +7,18 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" - extqueue "github.com/uber/submitqueue/extension/queue" + entityqueue "github.com/uber/submitqueue/entity/queue" "go.uber.org/zap" ) // Controller handles request queue messages. +// It consumes requests, validates them, and publishes to the next stage. // Implements consumer.Controller interface for integration with the consumer. type Controller struct { - logger *zap.SugaredLogger - metricsScope tally.Scope - topic string + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + topic consumer.Topic consumerGroup string } @@ -24,17 +26,24 @@ type Controller struct { var _ consumer.Controller = (*Controller)(nil) // NewController creates a new request controller for the orchestrator. -func NewController(logger *zap.SugaredLogger, scope tally.Scope) *Controller { +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + registry consumer.TopicRegistry, + topic consumer.Topic, + consumerGroup string, +) *Controller { return &Controller{ - logger: logger.Named("request_controller"), - metricsScope: scope.SubScope("request_controller"), - topic: "request", - consumerGroup: "orchestrator-request", + logger: logger.Named("request_controller"), + metricsScope: scope.SubScope("request_controller"), + registry: registry, + topic: topic, + consumerGroup: consumerGroup, } } // Process processes a request delivery from the queue. -// Deserializes the request, logs the event, and prepares for future state transitions. +// Deserializes the request, validates it, and publishes to the batch topic. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) @@ -67,22 +76,58 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "partition_key", msg.PartitionKey, ) - // TODO: Update request state to processing - // TODO: Perform validation checks - // TODO: Publish to next queue (requests_for_batching) + // TODO: Add validation logic + // - Merge Conflict Check + + // Publish to batch topic + if err := c.publish(ctx, consumer.TopicToBatch, request); err != nil { + c.logger.Errorw("failed to publish output", + "request_id", request.ID, + "topic", "batch", + "error", err, + ) + c.metricsScope.Counter("publish_errors").Inc(1) + return fmt.Errorf("failed to publish to batch: %w", err) + } + + c.logger.Infow("published request to next stage", + "request_id", request.ID, + "topic", "batch", + ) c.metricsScope.Counter("processed").Inc(1) return nil // Success - message will be acked } +// publish publishes a request to the specified topic. +func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request entity.Request) error { + payload, err := request.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize request: %w", err) + } + + msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + + q, ok := c.registry.Queue(topic) + if !ok { + return fmt.Errorf("no queue registered for topic %s", topic) + } + + if err := q.Publisher().Publish(ctx, topic.String(), msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + // Name returns the controller name for logging and metrics. func (c *Controller) Name() string { return "request" } // Topic returns the topic this controller subscribes to. -func (c *Controller) Topic() string { +func (c *Controller) Topic() consumer.Topic { return c.topic } @@ -90,22 +135,3 @@ func (c *Controller) Topic() string { func (c *Controller) ConsumerGroup() string { return c.consumerGroup } - -// SubscriptionConfig returns the subscription config for the request controller. -// Uses default settings which work well for request processing (100ms poll, 60s visibility timeout). -func (c *Controller) SubscriptionConfig(subscriberName string) extqueue.SubscriptionConfig { - config := extqueue.DefaultSubscriptionConfig(subscriberName, c.consumerGroup) - - // Request controller uses default settings: - // - PollInterval: 100ms (fast polling for immediate request processing) - // - BatchSize: 10 - // - VisibilityTimeout: 60s - // - Retry: 3 attempts with exponential backoff - // - DLQ: enabled - - // Can customize if needed: - // config.PollInterval = 50 * time.Millisecond // Even faster polling - // config.BatchSize = 20 // Process more requests at once - - return config -} diff --git a/orchestrator/controller/request/request_test.go b/orchestrator/controller/request/request_test.go index d22d7ec0..1752d192 100644 --- a/orchestrator/controller/request/request_test.go +++ b/orchestrator/controller/request/request_test.go @@ -11,33 +11,49 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" - extqueue "github.com/uber/submitqueue/extension/queue" - "github.com/uber/submitqueue/extension/queue/mock" + queuemock "github.com/uber/submitqueue/extension/queue/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) -func TestNewController(t *testing.T) { +// newTestController creates a controller with test dependencies. +func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - controller := NewController(logger, scope) + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg queue.Message) error { + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Topic: consumer.TopicToBatch, Queue: mockQ}}, + nil, + ) + + return NewController(logger, scope, registry, consumer.TopicRequest, "orchestrator-request") +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) require.NotNil(t, controller) - assert.Equal(t, "request", controller.Topic()) + assert.Equal(t, consumer.TopicRequest, controller.Topic()) assert.Equal(t, "orchestrator-request", controller.ConsumerGroup()) assert.Equal(t, "request", controller.Name()) } func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - defer ctrl.Finish() - logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope - controller := NewController(logger, scope) + controller := newTestController(t, ctrl, nil) - // Create a valid request request := entity.Request{ ID: "test-queue/123", Queue: "test-queue", @@ -47,42 +63,31 @@ func TestController_Process_Success(t *testing.T) { Version: 1, } - // Serialize to bytes payload, err := request.ToBytes() require.NoError(t, err) - // Create delivery with mock msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) - delivery := mock.NewMockDelivery(ctrl) + delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - // Handle the delivery - ctx := context.Background() - err = controller.Process(ctx, delivery) - - // Should return nil (success) + err = controller.Process(context.Background(), delivery) require.NoError(t, err) } func TestController_Process_InvalidJSON(t *testing.T) { ctrl := gomock.NewController(t) - defer ctrl.Finish() - logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope - controller := NewController(logger, scope) + controller := newTestController(t, ctrl, nil) - // Create delivery with invalid JSON invalidPayload := []byte(`{"invalid": json"}`) msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) - delivery := mock.NewMockDelivery(ctrl) + delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() // Process the delivery - ctx := context.Background() - err := controller.Process(ctx, delivery) + err := controller.Process(context.Background(), delivery) // Should return NonRetryableError for malformed messages require.Error(t, err) @@ -104,11 +109,8 @@ func TestController_Process_AllRequestStates(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) - defer ctrl.Finish() - logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope - controller := NewController(logger, scope) + controller := newTestController(t, ctrl, nil) request := entity.Request{ ID: fmt.Sprintf("queue/%s", tt.state), @@ -123,13 +125,11 @@ func TestController_Process_AllRequestStates(t *testing.T) { require.NoError(t, err) msg := queue.NewMessage(request.ID, payload, request.Queue, nil) - delivery := mock.NewMockDelivery(ctrl) + delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - ctx := context.Background() - err = controller.Process(ctx, delivery) - + err = controller.Process(context.Background(), delivery) require.NoError(t, err) }) } @@ -137,18 +137,15 @@ func TestController_Process_AllRequestStates(t *testing.T) { func TestController_Process_MultipleChanges(t *testing.T) { ctrl := gomock.NewController(t) - defer ctrl.Finish() - logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope - controller := NewController(logger, scope) + controller := newTestController(t, ctrl, nil) request := entity.Request{ ID: "queue/999", Queue: "test-queue", Change: entity.Change{ Source: "github", - IDs: []string{"PR-1", "PR-2", "PR-3"}, // Multiple PRs + IDs: []string{"PR-1", "PR-2", "PR-3"}, }, LandStrategy: entity.RequestLandStrategySquashRebase, State: entity.RequestStateNew, @@ -159,43 +156,43 @@ func TestController_Process_MultipleChanges(t *testing.T) { require.NoError(t, err) msg := queue.NewMessage(request.ID, payload, request.Queue, nil) - delivery := mock.NewMockDelivery(ctrl) + delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - ctx := context.Background() - err = controller.Process(ctx, delivery) - + err = controller.Process(context.Background(), delivery) require.NoError(t, err) } -func TestController_SubscriptionConfig(t *testing.T) { - logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope - controller := NewController(logger, scope) +func TestController_Process_PublishFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{Source: "github", IDs: []string{"PR-1"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } - config := controller.SubscriptionConfig("test-worker-123") + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() - assert.Equal(t, "test-worker-123", config.SubscriberName) - assert.Equal(t, "orchestrator-request", config.ConsumerGroup) - assert.Equal(t, int64(100), config.PollIntervalMs) // 100ms - assert.Equal(t, 10, config.BatchSize) - assert.Equal(t, int64(60000), config.VisibilityTimeoutMs) // 60s - assert.Equal(t, 3, config.Retry.MaxAttempts) - assert.True(t, config.DLQ.Enabled) + err = controller.Process(context.Background(), delivery) + assert.Error(t, err) } func TestController_InterfaceImplementation(t *testing.T) { - logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope - controller := NewController(logger, scope) - - // Verify implements consumer.Controller interface - var _ interface { - Process(ctx context.Context, delivery consumer.Delivery) error - Name() string - Topic() string - ConsumerGroup() string - SubscriptionConfig(subscriberName string) extqueue.SubscriptionConfig - } = controller + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) + + var _ consumer.Controller = controller } diff --git a/test/integration/extension/queue/sql/queue_test.go b/test/integration/extension/queue/sql/queue_test.go index 999edf10..71a048e5 100644 --- a/test/integration/extension/queue/sql/queue_test.go +++ b/test/integration/extension/queue/sql/queue_test.go @@ -135,7 +135,7 @@ func (s *SQLQueueIntegrationSuite) TestPublishAndSubscribe() { topic := "test_topic" // Subscribe first with config - subConfig := extqueue.DefaultSubscriptionConfig("test-worker-1", "test-consumer") + subConfig := extqueue.DefaultSubscriptionConfig(topic, "test-worker-1", "test-consumer") deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -205,7 +205,7 @@ func (s *SQLQueueIntegrationSuite) TestMultiplePartitions() { topic := "multi_partition_topic" // Subscribe - subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "multi-partition-consumer") + subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "multi-partition-consumer") deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -250,7 +250,7 @@ func (s *SQLQueueIntegrationSuite) TestVisibilityTimeoutAndRetry() { topic := "retry_topic" // Use short visibility timeout for faster test - subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "retry-consumer") + subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "retry-consumer") subConfig.VisibilityTimeoutMs = 2000 // 2 seconds subConfig.PollIntervalMs = 100 // 100 milliseconds @@ -337,7 +337,7 @@ func (s *SQLQueueIntegrationSuite) TestNackWithDelay() { topic := "nack_topic" // Subscribe - subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "nack-consumer") + subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "nack-consumer") subConfig.PollIntervalMs = 100 // 100 milliseconds deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -389,7 +389,7 @@ func (s *SQLQueueIntegrationSuite) TestIdempotentPublish() { topic := "idempotent_topic" // Subscribe - subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "idempotent-consumer") + subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "idempotent-consumer") deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -436,7 +436,7 @@ func (s *SQLQueueIntegrationSuite) TestConcurrentPublishers() { topic := "concurrent_topic" // Subscribe - subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "concurrent-consumer") + subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "concurrent-consumer") deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -491,7 +491,7 @@ func (s *SQLQueueIntegrationSuite) TestCrashRecovery() { topic := "crash_topic" // Use short timeouts for faster test - subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "crash-consumer") + subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "crash-consumer") subConfig.VisibilityTimeoutMs = 2000 // 2 seconds subConfig.PollIntervalMs = 100 // 100 milliseconds subConfig.LeaseDurationMs = 3000 // 3 seconds - short lease for testing crash recovery @@ -530,7 +530,7 @@ func (s *SQLQueueIntegrationSuite) TestCrashRecovery() { subscriber2 := q2.Subscriber() - subConfig2 := extqueue.DefaultSubscriptionConfig("worker-2", "crash-consumer") + subConfig2 := extqueue.DefaultSubscriptionConfig(topic, "worker-2", "crash-consumer") subConfig2.VisibilityTimeoutMs = 2000 // 2 seconds subConfig2.PollIntervalMs = 100 // 100 milliseconds subConfig2.LeaseDurationMs = 3000 // 3 seconds @@ -577,11 +577,11 @@ func (s *SQLQueueIntegrationSuite) TestMultipleConsumerGroups() { subscriber2 := q2.Subscriber() // Subscribe both groups - subConfig1 := extqueue.DefaultSubscriptionConfig("worker-1", "group-A") + subConfig1 := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "group-A") deliveryChan1, err := subscriber1.Subscribe(s.ctx, topic, subConfig1) require.NoError(t, err) - subConfig2 := extqueue.DefaultSubscriptionConfig("worker-1", "group-B") + subConfig2 := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "group-B") deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic, subConfig2) require.NoError(t, err) @@ -656,11 +656,11 @@ func (s *SQLQueueIntegrationSuite) TestMultipleWorkersInConsumerGroup() { subscriber2 := q2.Subscriber() // Subscribe both workers - subConfig1 := extqueue.DefaultSubscriptionConfig("worker-1", consumerGroup) + subConfig1 := extqueue.DefaultSubscriptionConfig(topic, "worker-1", consumerGroup) deliveryChan1, err := subscriber1.Subscribe(s.ctx, topic, subConfig1) require.NoError(t, err) - subConfig2 := extqueue.DefaultSubscriptionConfig("worker-2", consumerGroup) + subConfig2 := extqueue.DefaultSubscriptionConfig(topic, "worker-2", consumerGroup) deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic, subConfig2) require.NoError(t, err) @@ -776,7 +776,7 @@ func (s *SQLQueueIntegrationSuite) TestConcurrentSubscribers() { queues = append(queues, q) subscriber := q.Subscriber() - subConfig := extqueue.DefaultSubscriptionConfig(fmt.Sprintf("worker-%d", i), consumerGroup) + subConfig := extqueue.DefaultSubscriptionConfig(topic, fmt.Sprintf("worker-%d", i), consumerGroup) deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) deliveryChans = append(deliveryChans, deliveryChan) @@ -869,7 +869,7 @@ func (s *SQLQueueIntegrationSuite) TestDeadLetterQueue() { subscriber := q.Subscriber() // Configure with low max attempts and DLQ enabled - subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "dlq-consumer") + subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "dlq-consumer") subConfig.PollIntervalMs = 100 // 100 milliseconds subConfig.VisibilityTimeoutMs = 1000 // 1 second subConfig.Retry.MaxAttempts = 2 // Only 2 attempts before DLQ @@ -914,7 +914,7 @@ func (s *SQLQueueIntegrationSuite) TestDeadLetterQueue() { dlqTopic := topic + subConfig.DLQ.TopicSuffix t.Logf("Subscribing to DLQ topic: %s", dlqTopic) - dlqConfig := extqueue.DefaultSubscriptionConfig("worker-1", "dlq-consumer") + dlqConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "dlq-consumer") dlqDeliveryChan, err := subscriber.Subscribe(s.ctx, dlqTopic, dlqConfig) require.NoError(t, err) @@ -965,7 +965,7 @@ func (s *SQLQueueIntegrationSuite) TestMessageOrderingWithinPartition() { subscriber := q.Subscriber() // Subscribe first - subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "ordering-consumer") + subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "ordering-consumer") deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -1027,7 +1027,7 @@ func (s *SQLQueueIntegrationSuite) TestLateSubscriber() { // Now subscribe (late subscriber) subscriber := q.Subscriber() - subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "late-consumer") + subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "late-consumer") deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) t.Logf("Late subscriber joined after messages published") @@ -1066,7 +1066,7 @@ func (s *SQLQueueIntegrationSuite) TestEmptyTopicSubscribe() { subscriber := q.Subscriber() // Subscribe to empty topic (no messages published yet) - subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "empty-consumer") + subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "empty-consumer") subConfig.PollIntervalMs = 100 // 100 milliseconds deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -1110,7 +1110,7 @@ func (s *SQLQueueIntegrationSuite) TestGracefulShutdownDuringProcessing() { subscriber := q.Subscriber() // Subscribe - subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "shutdown-consumer") + subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "shutdown-consumer") subConfig.PollIntervalMs = 100 // 100 milliseconds deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -1174,7 +1174,7 @@ drainLoop: defer q2.Close() subscriber2 := q2.Subscriber() - subConfig2 := extqueue.DefaultSubscriptionConfig("worker-1", "shutdown-consumer") + subConfig2 := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "shutdown-consumer") deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic, subConfig2) require.NoError(t, err) diff --git a/test/integration/extension/storage/mysql/storage_test.go b/test/integration/extension/storage/mysql/storage_test.go index 9dd3444c..4f4c9d41 100644 --- a/test/integration/extension/storage/mysql/storage_test.go +++ b/test/integration/extension/storage/mysql/storage_test.go @@ -48,10 +48,6 @@ func (s *MySQLStorageIntegrationSuite) SetupSuite() { s.log.Logf("Compose stack started successfully") - // Get MySQL DSN for storage initialization - dsn, err := s.stack.MySQLServiceDSN("mysql") - require.NoError(t, err, "failed to get MySQL DSN") - // Connect to MySQL for schema application s.db, err = s.stack.ConnectMySQLService("mysql") require.NoError(t, err, "failed to connect to MySQL") @@ -62,10 +58,8 @@ func (s *MySQLStorageIntegrationSuite) SetupSuite() { s.log.Logf("Schemas applied successfully") - // Create storage instance - store, err := mysqlstorage.NewStorage(mysqlstorage.MySQLParameters{ - DSN: dsn, - }) + // Create storage instance using the existing database connection + store, err := mysqlstorage.NewStorage(s.db) require.NoError(t, err, "failed to create storage") // Provide the storage instance to the contract suite diff --git a/test/testutil/compose.go b/test/testutil/compose.go index 6eafad46..84aac5cf 100644 --- a/test/testutil/compose.go +++ b/test/testutil/compose.go @@ -24,7 +24,8 @@ type ComposeStack struct { t *testing.T log *TestLogger ctx context.Context - composeCmd []string // docker-compose command (either ["docker-compose"] or ["docker", "compose"]) + composeCmd []string // docker-compose command (either ["docker-compose"] or ["docker", "compose"]) + logCmd *exec.Cmd // background "docker compose logs -f" process } // getDockerComposeCommand returns the docker-compose command to use. @@ -68,16 +69,7 @@ func NewComposeStack(t *testing.T, log *TestLogger, ctx context.Context, compose // Register cleanup t.Cleanup(func() { - // Skip cleanup if test failed (for debugging) or SKIP_CLEANUP env var is set - if t.Failed() { - log.Logf("Test FAILED - keeping containers for debugging") - log.Logf("Container prefix: %s", projectName) - log.Logf("List containers: docker ps -a | grep %s", projectName) - log.Logf("View logs: docker logs %s--1", projectName) - composeCmd := strings.Join(stack.composeCmd, " ") - log.Logf("Clean up manually: %s -f %s -p %s down -v --rmi local", composeCmd, absPath, projectName) - return - } + stack.stopLogs() if os.Getenv("SKIP_CLEANUP") == "true" { log.Logf("SKIP_CLEANUP=true - keeping containers for inspection") @@ -95,11 +87,12 @@ func NewComposeStack(t *testing.T, log *TestLogger, ctx context.Context, compose } // Up starts all services in the compose stack. +// Uses --wait to block until all services with healthcheck directives are healthy. func (s *ComposeStack) Up() error { s.t.Helper() s.log.Logf("Starting compose stack from %s", s.composeFile) - args := append(s.composeCmd[1:], "-f", s.composeFile, "-p", s.projectName, "up", "-d", "--build") + args := append(s.composeCmd[1:], "-f", s.composeFile, "-p", s.projectName, "up", "-d", "--build", "--wait") cmd := exec.CommandContext(s.ctx, s.composeCmd[0], args...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr @@ -108,11 +101,8 @@ func (s *ComposeStack) Up() error { return fmt.Errorf("failed to start compose stack: %w", err) } - // Wait for services to be healthy - s.log.Logf("Waiting for services to be healthy...") - time.Sleep(5 * time.Second) // Simple wait for now - s.log.Logf("Compose stack started successfully") + s.tailLogs() return nil } @@ -131,6 +121,30 @@ func (s *ComposeStack) down() { } } +// tailLogs starts a background "docker compose logs -f" process that streams +// container logs to stderr in real time. Using os.Stderr instead of t.Log() +// because t.Log() buffers output until the test finishes. +func (s *ComposeStack) tailLogs() { + args := append(s.composeCmd[1:], "-f", s.composeFile, "-p", s.projectName, "logs", "-f") + cmd := exec.Command(s.composeCmd[0], args...) + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + s.log.Logf("Warning: failed to tail logs: %v", err) + return + } + s.logCmd = cmd +} + +// stopLogs kills the background log-tailing process if running. +func (s *ComposeStack) stopLogs() { + if s.logCmd != nil && s.logCmd.Process != nil { + s.logCmd.Process.Kill() + s.logCmd.Wait() + } +} + // ServicePort returns the mapped host port for a service's container port. func (s *ComposeStack) ServicePort(serviceName string, containerPort int) (int, error) { s.t.Helper()