Skip to content

Commit 39e6a79

Browse files
committed
feat(queue): complete Gateway → Orchestrator queue integration
## Summary Wire up the first stage of the queue pipeline: Gateway publishes land requests to the queue, Orchestrator consumes and processes them. Queue infrastructure is optional to maintain backward compatibility with existing integration tests. ## Why? The orchestrator needs to process requests asynchronously through a multi-stage queue pipeline. This establishes the foundation with reusable consumer infrastructure that scales to 10+ queues. ## What? **Consumer Infrastructure:** - Add Consumer interface (Register/Start/Stop) for orchestrating multiple controllers - Add Controller interface for queue message processing with per-subscription config - Implement subscription lifecycle, automatic ack/nack, metrics, graceful shutdown - Configurable shutdown timeout (timeoutMs parameter) **Gateway:** - Modify Land controller to publish requests to land_request queue after storage - Add publisher metrics and error handling - Queue infrastructure optional (controlled by QUEUE_MYSQL_DSN env var) - Land API returns error if queue not configured **Orchestrator:** - Add Request controller to consume from land_request queue - Wire up consumer with graceful shutdown in main.go - Queue infrastructure optional (controlled by QUEUE_MYSQL_DSN env var) - Server starts without queue for Ping API testing **Entities:** - Add Request JSON serialization (ToBytes/FromBytes) **CLAUDE.md:** - Extend time guideline to use int64 milliseconds for timestamps AND durations - Add code style guidelines: use SugaredLogger, use interfaces for contracts **Integration Tests:** - All 3 service integration tests pass without MySQL - Servers start successfully when queue infrastructure not configured - Backward compatible with existing Ping API tests ## Test Plan - 607 lines of consumer tests with channel-based synchronization - Request entity serialization tests - Gateway publisher integration tests - Request controller tests with various states - All unit tests pass - All integration tests pass (gateway, orchestrator, speculator)
1 parent f2b4750 commit 39e6a79

20 files changed

Lines changed: 1861 additions & 42 deletions

File tree

CLAUDE.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ entity/
7070
**Entity guidelines:**
7171
1. Keep entities pure and framework-agnostic — no external dependencies
7272
2. Use value types, not references
73-
3. Prefer `int64` Unix epoch milliseconds over `time.Time`
73+
3. Prefer `int64` milliseconds over `time.Time` and `time.Duration`:
74+
- Timestamps: Unix epoch milliseconds (e.g., `CreatedAt int64`) — use `time.UnixMilli()` method
75+
- Durations/timeouts: milliseconds (e.g., `TimeoutMs int64`, `DelayMs int64`)
76+
- Use `time.Duration(ms) * time.Millisecond` to convert to `time.Duration` when needed
7477
4. Every field must have a comment explaining its meaning
7578
5. Reference other entities by ID (string or int), not directly
7679
6. Use string enums with clear names; assign sentinel values (`""` for strings, `0` for ints) to unreachable/unknown enum variants
@@ -204,3 +207,20 @@ make clean-proto # Remove generated proto files
204207
1. **Avoid asserting on error messages** — assert on error type if it is part of the contract, or assert generic error otherwise.
205208
2. **Avoid blocking operations for synchronization** — do not use `time.Sleep`. Design the tested routine to signal back (channels, callbacks, condition variables).
206209
3. **Use testify assertions** — use `stretchr/assert` or `require` instead of `t.Fatal()`.
210+
211+
### Code Style Guidelines
212+
213+
1. **Use SugaredLogger for structured logging** — always use `zap.SugaredLogger` with structured logging methods:
214+
- `logger.Debugw(msg, key1, val1, key2, val2, ...)` for debug logs
215+
- `logger.Infow(msg, key1, val1, key2, val2, ...)` for info logs
216+
- `logger.Errorw(msg, key1, val1, key2, val2, ...)` for error logs
217+
- Never use unstructured methods like `Debug()`, `Info()`, `Error()`, or `Printf()`
218+
- Example: `logger.Infow("starting consumer", "subscriber_name", subscriberName, "controller_count", len(controllers))`
219+
220+
2. **Use interfaces for contracts** — define interfaces for public APIs and dependencies:
221+
- Public components should return/accept interfaces, not concrete structs
222+
- Unexported structs implement the interfaces
223+
- Makes testing easier through mocking
224+
- Example: `func New(...) Consumer` returns interface, not `*consumer`
225+
- Implementation struct is unexported: `type consumer struct { ... }`
226+

consumer/BUILD.bazel

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "consumer",
5+
srcs = [
6+
"consumer.go",
7+
"controller.go",
8+
],
9+
importpath = "github.com/uber/submitqueue/consumer",
10+
visibility = ["//visibility:public"],
11+
deps = [
12+
"//extension/queue",
13+
"@com_github_uber_go_tally_v4//:tally",
14+
"@org_uber_go_zap//:zap",
15+
],
16+
)
17+
18+
go_test(
19+
name = "consumer_test",
20+
srcs = ["consumer_test.go"],
21+
embed = [":consumer"],
22+
deps = [
23+
"//entity/queue",
24+
"//extension/queue",
25+
"@com_github_stretchr_testify//assert",
26+
"@com_github_stretchr_testify//require",
27+
"@com_github_uber_go_tally_v4//:tally",
28+
"@org_uber_go_zap//zaptest",
29+
],
30+
)

0 commit comments

Comments
 (0)