Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use_repo(
"com_github_data_dog_go_sqlmock",
"com_github_go_sql_driver_mysql",
"com_github_gogo_protobuf",
"com_github_spf13_cobra",
"com_github_stretchr_testify",
"com_github_uber_go_tally_v4",
"org_golang_google_grpc",
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ LOCAL_PROJECT = submitqueue
# Set REPO_ROOT for docker-compose
export REPO_ROOT := $(shell pwd)

.PHONY: build build-all-linux build-gateway-linux build-orchestrator-linux clean clean-proto deps e2e-test gazelle integration-test integration-test-extensions integration-test-gateway integration-test-orchestrator local-clean local-gateway-start local-gateway-stop local-init-schemas local-logs local-orchestrator-start local-orchestrator-stop local-ps local-restart local-start local-stop proto query-deps query-targets run-client-gateway run-client-orchestrator run-client-speculator test test-no-cache help
.PHONY: build build-all-linux build-gateway-linux build-orchestrator-linux clean clean-proto deps e2e-test gazelle integration-test integration-test-extensions integration-test-gateway integration-test-orchestrator local-clean local-gateway-start local-gateway-stop local-init-schemas local-logs local-orchestrator-start local-orchestrator-stop local-ps local-restart local-start local-stop proto query-deps query-targets run-client-gateway run-client-orchestrator run-client-speculator run-queue-admin test test-no-cache help


build: ## Build all services and examples
Expand Down Expand Up @@ -227,6 +227,9 @@ run-client-orchestrator:
run-client-speculator:
@$(BAZEL) run //example/client/speculator:speculator -- -addr $(or $(SERVER_ADDR),localhost:8083) -message "$(or $(MESSAGE),ping)"

run-queue-admin: ## Run queue-admin CLI (use ARGS to pass arguments, e.g. make run-queue-admin ARGS="list-topics")
@$(BAZEL) run //extension/queue/mysql/ctl -- $(ARGS)

test: ## Run unit tests
@echo "Running unit tests..."
@$(BAZEL) test //... --test_tag_filters=-manual,-integration || echo "No unit tests found (only integration tests exist)"
Expand Down
13 changes: 13 additions & 0 deletions core/consumer/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@ const (
TopicFinalize Topic = "finalize"
)

// AllTopics returns all defined pipeline topics.
// Update this list when adding new topics.
var AllTopics = []Topic{
TopicRequest,
TopicToBatch,
TopicBatched,
TopicBuild,
TopicBuildSignal,
TopicToMerge,
TopicMergeSignal,
TopicFinalize,
}

// String returns the topic name as a string.
func (t Topic) String() string {
return string(t)
Expand Down
1 change: 1 addition & 0 deletions extension/queue/mysql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_test(
],
embed = [":mysql"],
deps = [
"//core/consumer",
"//entity/queue",
"//extension/queue",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
Expand Down
22 changes: 22 additions & 0 deletions extension/queue/mysql/ctl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
load("@rules_go//go:def.bzl", "go_binary", "go_library")

go_library(
name = "ctl_lib",
srcs = [
"commands.go",
"main.go",
],
importpath = "github.com/uber/submitqueue/extension/queue/mysql/ctl",
visibility = ["//visibility:private"],
deps = [
"//extension/queue/mysql/ctl/lib",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_spf13_cobra//:cobra",
],
)

go_binary(
name = "ctl",
embed = [":ctl_lib"],
visibility = ["//visibility:public"],
)
146 changes: 146 additions & 0 deletions extension/queue/mysql/ctl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Queue Admin CLI

Admin CLI for inspecting, managing, and troubleshooting the MySQL-backed message queue. Operates directly on the queue database tables (`queue_messages`, `queue_offsets`, `queue_partition_leases`).

## Setup

Start the local stack and find the MySQL queue port:

```bash
make local-start
make local-ps # note the MySQL Queue port
```

Set the DSN once for the session:

```bash
export QUEUE_MYSQL_DSN="root:root@tcp(localhost:<PORT>)/submitqueue"
```

Or pass `--dsn` on every command.

## Running

Via Make (uses Bazel):

```bash
make run-queue-admin ARGS="list-topics"
make run-queue-admin ARGS="topic-stats --topic merge_queue"
```

Via Bazel directly:

```bash
bazel run //extension/queue/mysql/ctl -- list-topics
bazel run //extension/queue/mysql/ctl -- topic-stats --topic merge_queue
```

## Commands

### Inspect Topics

```bash
# List all topics with message counts
queue-admin list-topics

# Detailed stats for a topic (visible/invisible messages, DLQ count, partitions, consumer groups)
queue-admin topic-stats --topic merge_queue
```

### Inspect Messages

```bash
# List messages (default limit 50)
queue-admin list-messages --topic merge_queue

# Filter by partition, custom limit
queue-admin list-messages --topic merge_queue --partition uber/cadence --limit 10

# Full message details including payload and metadata
queue-admin inspect-message --topic merge_queue --message-id msg-123
```

### Manage Messages

Destructive commands prompt for confirmation by default. Use `--no-interactive` to skip prompts (for scripting).

```bash
# Delete a single message
queue-admin delete-message --topic merge_queue --message-id msg-123

# Purge all messages from a topic
queue-admin purge-topic --topic merge_queue

# Skip confirmation prompt (for scripting)
queue-admin purge-topic --topic merge_queue --no-interactive
```

### Dead Letter Queue (DLQ)

DLQ messages live in the same `queue_messages` table under `topic + "_dlq"` (default suffix).

```bash
# List DLQ messages
queue-admin list-dlq --topic merge_queue

# Inspect a DLQ message (use the DLQ topic name)
queue-admin inspect-message --topic merge_queue_dlq --message-id msg-456

# Move a DLQ message back to the original topic
queue-admin requeue-dlq --topic merge_queue --message-id msg-456

# Purge all DLQ messages
queue-admin purge-dlq --topic merge_queue

# Custom DLQ suffix (if not using default "_dlq")
queue-admin list-dlq --topic merge_queue --dlq-suffix _dead
```

### Consumer Lag

```bash
# Per-partition lag for all consumer groups on a topic
queue-admin consumer-lag --topic merge_queue
```

Output shows `ACKED` (last processed offset), `LATEST` (newest message offset), and `LAG` (unprocessed count) per partition per consumer group.

### Consumer Offsets

```bash
# List all consumer group offsets
queue-admin list-offsets

# Filter by consumer group
queue-admin list-offsets --consumer-group orchestrator

# Reset offset to 0 (reprocess all messages)
queue-admin reset-offset --consumer-group orchestrator --topic merge_queue --partition uber/cadence

# Reset to a specific offset
queue-admin reset-offset --consumer-group orchestrator --topic merge_queue --partition uber/cadence --offset 42
```

### Partition Leases

```bash
# List all active partition leases (who owns what)
queue-admin list-leases

# Find stale leases (not renewed within threshold, likely dead workers)
queue-admin stale-leases # default 60s threshold
queue-admin stale-leases --threshold 30000 # 30s threshold

# Force-release a stuck lease
queue-admin release-lease --consumer-group orchestrator --topic merge_queue --partition uber/cadence
```

### JSON Output

Add `--json` to any read command for machine-readable output:

```bash
queue-admin list-topics --json
queue-admin consumer-lag --topic merge_queue --json
queue-admin list-messages --topic merge_queue --json | jq '.[] | .ID'
```
Loading