Skip to content

Commit d3bb110

Browse files
authored
Merge branch 'main' into kevin.new/queueconfig-buildprovider
2 parents a891df6 + eb8c3ba commit d3bb110

21 files changed

Lines changed: 2164 additions & 13 deletions

File tree

MODULE.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use_repo(
3333
"com_github_data_dog_go_sqlmock",
3434
"com_github_go_sql_driver_mysql",
3535
"com_github_gogo_protobuf",
36+
"com_github_spf13_cobra",
3637
"com_github_stretchr_testify",
3738
"com_github_uber_go_tally_v4",
3839
"org_golang_google_grpc",

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ LOCAL_PROJECT = submitqueue
1313
# Set REPO_ROOT for docker-compose
1414
export REPO_ROOT := $(shell pwd)
1515

16-
.PHONY: build build-all-linux build-gateway-linux build-orchestrator-linux clean clean-proto deps e2e-test gazelle integration-test integration-test-extensions integration-test-gateway integration-test-orchestrator local-clean local-gateway-start local-gateway-stop local-init-schemas local-logs local-orchestrator-start local-orchestrator-stop local-ps local-restart local-start local-stop proto query-deps query-targets run-client-gateway run-client-orchestrator run-client-speculator test test-no-cache help
16+
.PHONY: build build-all-linux build-gateway-linux build-orchestrator-linux clean clean-proto deps e2e-test gazelle integration-test integration-test-extensions integration-test-gateway integration-test-orchestrator local-clean local-gateway-start local-gateway-stop local-init-schemas local-logs local-orchestrator-start local-orchestrator-stop local-ps local-restart local-start local-stop proto query-deps query-targets run-client-gateway run-client-orchestrator run-client-speculator run-queue-admin test test-no-cache help
1717

1818

1919
build: ## Build all services and examples
@@ -227,6 +227,9 @@ run-client-orchestrator:
227227
run-client-speculator:
228228
@$(BAZEL) run //example/client/speculator:speculator -- -addr $(or $(SERVER_ADDR),localhost:8083) -message "$(or $(MESSAGE),ping)"
229229

230+
run-queue-admin: ## Run queue-admin CLI (use ARGS to pass arguments, e.g. make run-queue-admin ARGS="list-topics")
231+
@$(BAZEL) run //extension/queue/mysql/ctl -- $(ARGS)
232+
230233
test: ## Run unit tests
231234
@echo "Running unit tests..."
232235
@$(BAZEL) test //... --test_tag_filters=-manual,-integration || echo "No unit tests found (only integration tests exist)"

core/consumer/registry.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,19 @@ const (
2424
TopicFinalize Topic = "finalize"
2525
)
2626

27+
// AllTopics returns all defined pipeline topics.
28+
// Update this list when adding new topics.
29+
var AllTopics = []Topic{
30+
TopicRequest,
31+
TopicToBatch,
32+
TopicBatched,
33+
TopicBuild,
34+
TopicBuildSignal,
35+
TopicToMerge,
36+
TopicMergeSignal,
37+
TopicFinalize,
38+
}
39+
2740
// String returns the topic name as a string.
2841
func (t Topic) String() string {
2942
return string(t)

entity/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ go_library(
1818
go_test(
1919
name = "entity_test",
2020
srcs = [
21+
"build_test.go",
2122
"queue_config_test.go",
2223
"request_test.go",
2324
],

entity/build.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,40 @@ package entity
44
type BuildStatus string
55

66
const (
7-
// BuildStateUnknown is the unreachable state. It is set by default when the structure is initialized. It should never be seen in the system.
8-
BuildStateUnknown BuildStatus = ""
9-
// TODO: Add comprehensive list of known build states.
7+
// BuildStatusUnknown is the unreachable state. It is set by default when the structure is initialized. It should never be seen in the system.
8+
BuildStatusUnknown BuildStatus = ""
9+
10+
// BuildStatusQueued indicates the build has been scheduled but not yet started.
11+
BuildStatusQueued BuildStatus = "queued"
12+
13+
// BuildStatusRunning indicates the build is currently executing.
14+
BuildStatusRunning BuildStatus = "running"
15+
16+
// BuildStatusPassed indicates the build completed successfully.
17+
// This is a terminal state.
18+
BuildStatusPassed BuildStatus = "passed"
19+
20+
// BuildStatusFailed indicates the build completed with failures.
21+
// This is a terminal state.
22+
BuildStatusFailed BuildStatus = "failed"
23+
24+
// BuildStatusCancelled indicates the build was cancelled before completion.
25+
// This is a terminal state.
26+
BuildStatusCancelled BuildStatus = "cancelled"
27+
28+
// BuildStatusBlocked indicates the build is waiting for manual approval or unblocking.
29+
// Some CI systems (like BuildKite) support manual approval steps.
30+
BuildStatusBlocked BuildStatus = "blocked"
1031
)
1132

33+
// IsTerminal returns true if the build state represents a final state (passed, failed, or cancelled).
34+
// Terminal states indicate the build has finished and will not change state again.
35+
// Note: BuildStatusBlocked is NOT terminal as blocked builds can be unblocked and continue execution.
36+
func (s BuildStatus) IsTerminal() bool {
37+
return s == BuildStatusPassed || s == BuildStatusFailed || s == BuildStatusCancelled
38+
}
39+
40+
1241
// SpeculationPathInfo represents the base and head commits of a speculation path used in a build.
1342
type SpeculationPathInfo struct {
1443
// Base is a list of batchIDs(in order) that form the base of this speculation path.

entity/build_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package entity
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestBuildStatus_IsTerminal(t *testing.T) {
10+
tests := []struct {
11+
name string
12+
status BuildStatus
13+
expected bool
14+
}{
15+
{
16+
name: "passed is terminal",
17+
status: BuildStatusPassed,
18+
expected: true,
19+
},
20+
{
21+
name: "failed is terminal",
22+
status: BuildStatusFailed,
23+
expected: true,
24+
},
25+
{
26+
name: "cancelled is terminal",
27+
status: BuildStatusCancelled,
28+
expected: true,
29+
},
30+
{
31+
name: "queued is not terminal",
32+
status: BuildStatusQueued,
33+
expected: false,
34+
},
35+
{
36+
name: "running is not terminal",
37+
status: BuildStatusRunning,
38+
expected: false,
39+
},
40+
{
41+
name: "blocked is not terminal",
42+
status: BuildStatusBlocked,
43+
expected: false,
44+
},
45+
{
46+
name: "unknown is not terminal",
47+
status: BuildStatusUnknown,
48+
expected: false,
49+
},
50+
}
51+
52+
for _, tt := range tests {
53+
t.Run(tt.name, func(t *testing.T) {
54+
assert.Equal(t, tt.expected, tt.status.IsTerminal())
55+
})
56+
}
57+
}

extension/queue/mysql/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ go_test(
3838
],
3939
embed = [":mysql"],
4040
deps = [
41+
"//core/consumer",
4142
"//entity/queue",
4243
"//extension/queue",
4344
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
load("@rules_go//go:def.bzl", "go_binary", "go_library")
2+
3+
go_library(
4+
name = "ctl_lib",
5+
srcs = [
6+
"commands.go",
7+
"main.go",
8+
],
9+
importpath = "github.com/uber/submitqueue/extension/queue/mysql/ctl",
10+
visibility = ["//visibility:private"],
11+
deps = [
12+
"//extension/queue/mysql/ctl/lib",
13+
"@com_github_go_sql_driver_mysql//:mysql",
14+
"@com_github_spf13_cobra//:cobra",
15+
],
16+
)
17+
18+
go_binary(
19+
name = "ctl",
20+
embed = [":ctl_lib"],
21+
visibility = ["//visibility:public"],
22+
)
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# Queue Admin CLI
2+
3+
Admin CLI for inspecting, managing, and troubleshooting the MySQL-backed message queue. Operates directly on the queue database tables (`queue_messages`, `queue_offsets`, `queue_partition_leases`).
4+
5+
## Setup
6+
7+
Start the local stack and find the MySQL queue port:
8+
9+
```bash
10+
make local-start
11+
make local-ps # note the MySQL Queue port
12+
```
13+
14+
Set the DSN once for the session:
15+
16+
```bash
17+
export QUEUE_MYSQL_DSN="root:root@tcp(localhost:<PORT>)/submitqueue"
18+
```
19+
20+
Or pass `--dsn` on every command.
21+
22+
## Running
23+
24+
Via Make (uses Bazel):
25+
26+
```bash
27+
make run-queue-admin ARGS="list-topics"
28+
make run-queue-admin ARGS="topic-stats --topic merge_queue"
29+
```
30+
31+
Via Bazel directly:
32+
33+
```bash
34+
bazel run //extension/queue/mysql/ctl -- list-topics
35+
bazel run //extension/queue/mysql/ctl -- topic-stats --topic merge_queue
36+
```
37+
38+
## Commands
39+
40+
### Inspect Topics
41+
42+
```bash
43+
# List all topics with message counts
44+
queue-admin list-topics
45+
46+
# Detailed stats for a topic (visible/invisible messages, DLQ count, partitions, consumer groups)
47+
queue-admin topic-stats --topic merge_queue
48+
```
49+
50+
### Inspect Messages
51+
52+
```bash
53+
# List messages (default limit 50)
54+
queue-admin list-messages --topic merge_queue
55+
56+
# Filter by partition, custom limit
57+
queue-admin list-messages --topic merge_queue --partition uber/cadence --limit 10
58+
59+
# Full message details including payload and metadata
60+
queue-admin inspect-message --topic merge_queue --message-id msg-123
61+
```
62+
63+
### Manage Messages
64+
65+
Destructive commands prompt for confirmation by default. Use `--no-interactive` to skip prompts (for scripting).
66+
67+
```bash
68+
# Delete a single message
69+
queue-admin delete-message --topic merge_queue --message-id msg-123
70+
71+
# Purge all messages from a topic
72+
queue-admin purge-topic --topic merge_queue
73+
74+
# Skip confirmation prompt (for scripting)
75+
queue-admin purge-topic --topic merge_queue --no-interactive
76+
```
77+
78+
### Dead Letter Queue (DLQ)
79+
80+
DLQ messages live in the same `queue_messages` table under `topic + "_dlq"` (default suffix).
81+
82+
```bash
83+
# List DLQ messages
84+
queue-admin list-dlq --topic merge_queue
85+
86+
# Inspect a DLQ message (use the DLQ topic name)
87+
queue-admin inspect-message --topic merge_queue_dlq --message-id msg-456
88+
89+
# Move a DLQ message back to the original topic
90+
queue-admin requeue-dlq --topic merge_queue --message-id msg-456
91+
92+
# Purge all DLQ messages
93+
queue-admin purge-dlq --topic merge_queue
94+
95+
# Custom DLQ suffix (if not using default "_dlq")
96+
queue-admin list-dlq --topic merge_queue --dlq-suffix _dead
97+
```
98+
99+
### Consumer Lag
100+
101+
```bash
102+
# Per-partition lag for all consumer groups on a topic
103+
queue-admin consumer-lag --topic merge_queue
104+
```
105+
106+
Output shows `ACKED` (last processed offset), `LATEST` (newest message offset), and `LAG` (unprocessed count) per partition per consumer group.
107+
108+
### Consumer Offsets
109+
110+
```bash
111+
# List all consumer group offsets
112+
queue-admin list-offsets
113+
114+
# Filter by consumer group
115+
queue-admin list-offsets --consumer-group orchestrator
116+
117+
# Reset offset to 0 (reprocess all messages)
118+
queue-admin reset-offset --consumer-group orchestrator --topic merge_queue --partition uber/cadence
119+
120+
# Reset to a specific offset
121+
queue-admin reset-offset --consumer-group orchestrator --topic merge_queue --partition uber/cadence --offset 42
122+
```
123+
124+
### Partition Leases
125+
126+
```bash
127+
# List all active partition leases (who owns what)
128+
queue-admin list-leases
129+
130+
# Find stale leases (not renewed within threshold, likely dead workers)
131+
queue-admin stale-leases # default 60s threshold
132+
queue-admin stale-leases --threshold 30000 # 30s threshold
133+
134+
# Force-release a stuck lease
135+
queue-admin release-lease --consumer-group orchestrator --topic merge_queue --partition uber/cadence
136+
```
137+
138+
### JSON Output
139+
140+
Add `--json` to any read command for machine-readable output:
141+
142+
```bash
143+
queue-admin list-topics --json
144+
queue-admin consumer-lag --topic merge_queue --json
145+
queue-admin list-messages --topic merge_queue --json | jq '.[] | .ID'
146+
```

0 commit comments

Comments
 (0)