diff --git a/MODULE.bazel b/MODULE.bazel index ad92bd90..28fb6a43 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -33,6 +33,7 @@ use_repo( "com_github_data_dog_go_sqlmock", "com_github_go_sql_driver_mysql", "com_github_gogo_protobuf", + "com_github_spf13_cobra", "com_github_stretchr_testify", "com_github_uber_go_tally_v4", "org_golang_google_grpc", diff --git a/Makefile b/Makefile index 0c141809..e2181f6e 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ LOCAL_PROJECT = submitqueue # Set REPO_ROOT for docker-compose export REPO_ROOT := $(shell pwd) -.PHONY: build build-all-linux build-gateway-linux build-orchestrator-linux clean clean-proto deps e2e-test gazelle integration-test integration-test-extensions integration-test-gateway integration-test-orchestrator local-clean local-gateway-start local-gateway-stop local-init-schemas local-logs local-orchestrator-start local-orchestrator-stop local-ps local-restart local-start local-stop proto query-deps query-targets run-client-gateway run-client-orchestrator run-client-speculator test test-no-cache help +.PHONY: build build-all-linux build-gateway-linux build-orchestrator-linux clean clean-proto deps e2e-test gazelle integration-test integration-test-extensions integration-test-gateway integration-test-orchestrator local-clean local-gateway-start local-gateway-stop local-init-schemas local-logs local-orchestrator-start local-orchestrator-stop local-ps local-restart local-start local-stop proto query-deps query-targets run-client-gateway run-client-orchestrator run-client-speculator run-queue-admin test test-no-cache help build: ## Build all services and examples @@ -227,6 +227,9 @@ run-client-orchestrator: run-client-speculator: @$(BAZEL) run //example/client/speculator:speculator -- -addr $(or $(SERVER_ADDR),localhost:8083) -message "$(or $(MESSAGE),ping)" +run-queue-admin: ## Run queue-admin CLI (use ARGS to pass arguments, e.g. make run-queue-admin ARGS="list-topics") + @$(BAZEL) run //extension/queue/mysql/ctl -- $(ARGS) + test: ## Run unit tests @echo "Running unit tests..." @$(BAZEL) test //... --test_tag_filters=-manual,-integration || echo "No unit tests found (only integration tests exist)" diff --git a/core/consumer/registry.go b/core/consumer/registry.go index 51b42ec0..7cbf65cd 100644 --- a/core/consumer/registry.go +++ b/core/consumer/registry.go @@ -24,6 +24,19 @@ const ( TopicFinalize Topic = "finalize" ) +// AllTopics returns all defined pipeline topics. +// Update this list when adding new topics. +var AllTopics = []Topic{ + TopicRequest, + TopicToBatch, + TopicBatched, + TopicBuild, + TopicBuildSignal, + TopicToMerge, + TopicMergeSignal, + TopicFinalize, +} + // String returns the topic name as a string. func (t Topic) String() string { return string(t) diff --git a/extension/queue/mysql/BUILD.bazel b/extension/queue/mysql/BUILD.bazel index 6ce04969..e96e51cd 100644 --- a/extension/queue/mysql/BUILD.bazel +++ b/extension/queue/mysql/BUILD.bazel @@ -38,6 +38,7 @@ go_test( ], embed = [":mysql"], deps = [ + "//core/consumer", "//entity/queue", "//extension/queue", "@com_github_data_dog_go_sqlmock//:go-sqlmock", diff --git a/extension/queue/mysql/ctl/BUILD.bazel b/extension/queue/mysql/ctl/BUILD.bazel new file mode 100644 index 00000000..e9bf2f4b --- /dev/null +++ b/extension/queue/mysql/ctl/BUILD.bazel @@ -0,0 +1,22 @@ +load("@rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "ctl_lib", + srcs = [ + "commands.go", + "main.go", + ], + importpath = "github.com/uber/submitqueue/extension/queue/mysql/ctl", + visibility = ["//visibility:private"], + deps = [ + "//extension/queue/mysql/ctl/lib", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_spf13_cobra//:cobra", + ], +) + +go_binary( + name = "ctl", + embed = [":ctl_lib"], + visibility = ["//visibility:public"], +) diff --git a/extension/queue/mysql/ctl/README.md b/extension/queue/mysql/ctl/README.md new file mode 100644 index 00000000..1d63c0d4 --- /dev/null +++ b/extension/queue/mysql/ctl/README.md @@ -0,0 +1,146 @@ +# Queue Admin CLI + +Admin CLI for inspecting, managing, and troubleshooting the MySQL-backed message queue. Operates directly on the queue database tables (`queue_messages`, `queue_offsets`, `queue_partition_leases`). + +## Setup + +Start the local stack and find the MySQL queue port: + +```bash +make local-start +make local-ps # note the MySQL Queue port +``` + +Set the DSN once for the session: + +```bash +export QUEUE_MYSQL_DSN="root:root@tcp(localhost:)/submitqueue" +``` + +Or pass `--dsn` on every command. + +## Running + +Via Make (uses Bazel): + +```bash +make run-queue-admin ARGS="list-topics" +make run-queue-admin ARGS="topic-stats --topic merge_queue" +``` + +Via Bazel directly: + +```bash +bazel run //extension/queue/mysql/ctl -- list-topics +bazel run //extension/queue/mysql/ctl -- topic-stats --topic merge_queue +``` + +## Commands + +### Inspect Topics + +```bash +# List all topics with message counts +queue-admin list-topics + +# Detailed stats for a topic (visible/invisible messages, DLQ count, partitions, consumer groups) +queue-admin topic-stats --topic merge_queue +``` + +### Inspect Messages + +```bash +# List messages (default limit 50) +queue-admin list-messages --topic merge_queue + +# Filter by partition, custom limit +queue-admin list-messages --topic merge_queue --partition uber/cadence --limit 10 + +# Full message details including payload and metadata +queue-admin inspect-message --topic merge_queue --message-id msg-123 +``` + +### Manage Messages + +Destructive commands prompt for confirmation by default. Use `--no-interactive` to skip prompts (for scripting). + +```bash +# Delete a single message +queue-admin delete-message --topic merge_queue --message-id msg-123 + +# Purge all messages from a topic +queue-admin purge-topic --topic merge_queue + +# Skip confirmation prompt (for scripting) +queue-admin purge-topic --topic merge_queue --no-interactive +``` + +### Dead Letter Queue (DLQ) + +DLQ messages live in the same `queue_messages` table under `topic + "_dlq"` (default suffix). + +```bash +# List DLQ messages +queue-admin list-dlq --topic merge_queue + +# Inspect a DLQ message (use the DLQ topic name) +queue-admin inspect-message --topic merge_queue_dlq --message-id msg-456 + +# Move a DLQ message back to the original topic +queue-admin requeue-dlq --topic merge_queue --message-id msg-456 + +# Purge all DLQ messages +queue-admin purge-dlq --topic merge_queue + +# Custom DLQ suffix (if not using default "_dlq") +queue-admin list-dlq --topic merge_queue --dlq-suffix _dead +``` + +### Consumer Lag + +```bash +# Per-partition lag for all consumer groups on a topic +queue-admin consumer-lag --topic merge_queue +``` + +Output shows `ACKED` (last processed offset), `LATEST` (newest message offset), and `LAG` (unprocessed count) per partition per consumer group. + +### Consumer Offsets + +```bash +# List all consumer group offsets +queue-admin list-offsets + +# Filter by consumer group +queue-admin list-offsets --consumer-group orchestrator + +# Reset offset to 0 (reprocess all messages) +queue-admin reset-offset --consumer-group orchestrator --topic merge_queue --partition uber/cadence + +# Reset to a specific offset +queue-admin reset-offset --consumer-group orchestrator --topic merge_queue --partition uber/cadence --offset 42 +``` + +### Partition Leases + +```bash +# List all active partition leases (who owns what) +queue-admin list-leases + +# Find stale leases (not renewed within threshold, likely dead workers) +queue-admin stale-leases # default 60s threshold +queue-admin stale-leases --threshold 30000 # 30s threshold + +# Force-release a stuck lease +queue-admin release-lease --consumer-group orchestrator --topic merge_queue --partition uber/cadence +``` + +### JSON Output + +Add `--json` to any read command for machine-readable output: + +```bash +queue-admin list-topics --json +queue-admin consumer-lag --topic merge_queue --json +queue-admin list-messages --topic merge_queue --json | jq '.[] | .ID' +``` diff --git a/extension/queue/mysql/ctl/commands.go b/extension/queue/mysql/ctl/commands.go new file mode 100644 index 00000000..6bb87c1a --- /dev/null +++ b/extension/queue/mysql/ctl/commands.go @@ -0,0 +1,570 @@ +package main + +import ( + "bufio" + "database/sql" + "fmt" + "os" + "strconv" + "strings" + + "github.com/spf13/cobra" + "github.com/uber/submitqueue/extension/queue/mysql/ctl/lib" +) + +// confirmAction prompts the user for confirmation unless --no-interactive is set. +// Returns nil if confirmed, error if declined. +func confirmAction(noInteractive bool, message string) error { + if noInteractive { + return nil + } + fmt.Fprintf(os.Stderr, "%s [y/N]: ", message) + reader := bufio.NewReader(os.Stdin) + answer, err := reader.ReadString('\n') + if err != nil { + return fmt.Errorf("failed to read confirmation: %w", err) + } + answer = strings.TrimSpace(strings.ToLower(answer)) + if answer != "y" && answer != "yes" { + return fmt.Errorf("operation cancelled") + } + return nil +} + +// newRootCmd creates the root cobra command with all subcommands wired up. +// The returned command handles --dsn and --json persistent flags. +func newRootCmd() *cobra.Command { + var ( + dsn string + jsonOut bool + noInteractive bool + store *lib.AdminStore + db *sql.DB + ) + + rootCmd := &cobra.Command{ + Use: "queue-admin", + Short: "Admin CLI for the MySQL queue extension", + Long: "Inspect, manage, and troubleshoot the MySQL-backed message queue.", + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + if dsn == "" { + dsn = os.Getenv("QUEUE_MYSQL_DSN") + } + if dsn == "" { + return fmt.Errorf("--dsn flag or QUEUE_MYSQL_DSN env var is required") + } + var err error + db, err = sql.Open("mysql", dsn) + if err != nil { + return fmt.Errorf("open database: %w", err) + } + if err := db.PingContext(cmd.Context()); err != nil { + return fmt.Errorf("ping database: %w", err) + } + store = lib.NewAdminStore(db) + return nil + }, + PersistentPostRunE: func(cmd *cobra.Command, args []string) error { + if db != nil { + return db.Close() + } + return nil + }, + } + + rootCmd.PersistentFlags().StringVar(&dsn, "dsn", "", "MySQL DSN (or set QUEUE_MYSQL_DSN)") + rootCmd.PersistentFlags().BoolVar(&jsonOut, "json", false, "Output in JSON format") + rootCmd.PersistentFlags().BoolVar(&noInteractive, "no-interactive", false, "Skip confirmation prompts (for scripting)") + + rootCmd.AddCommand( + newListTopicsCmd(&store, &jsonOut), + newTopicStatsCmd(&store, &jsonOut), + newListMessagesCmd(&store, &jsonOut), + newInspectMessageCmd(&store, &jsonOut), + newDeleteMessageCmd(&store, &noInteractive), + newPurgeTopicCmd(&store, &noInteractive), + newListDLQCmd(&store, &jsonOut), + newRequeueDLQCmd(&store), + newPurgeDLQCmd(&store, &noInteractive), + newListOffsetsCmd(&store, &jsonOut), + newResetOffsetCmd(&store, &noInteractive), + newListLeasesCmd(&store, &jsonOut), + newConsumerLagCmd(&store, &jsonOut), + newStaleLeasesCmd(&store, &jsonOut), + newReleaseLeaseCmd(&store, &noInteractive), + ) + + return rootCmd +} + +func newListTopicsCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command { + return &cobra.Command{ + Use: "list-topics", + Short: "List all topics with message counts", + RunE: func(cmd *cobra.Command, args []string) error { + topics, err := (*store).ListTopics(cmd.Context()) + if err != nil { + return err + } + if *jsonOut { + return lib.FormatJSON(os.Stdout, topics) + } + headers := []string{"TOPIC", "MESSAGES"} + var rows [][]string + for _, t := range topics { + rows = append(rows, []string{t.Topic, strconv.FormatInt(t.MessageCount, 10)}) + } + lib.FormatTable(os.Stdout, headers, rows) + return nil + }, + } +} + +func newTopicStatsCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command { + var topic, dlqSuffix string + cmd := &cobra.Command{ + Use: "topic-stats", + Short: "Show detailed statistics for a topic", + RunE: func(cmd *cobra.Command, args []string) error { + stats, err := (*store).GetTopicStats(cmd.Context(), topic, dlqSuffix) + if err != nil { + return err + } + if *jsonOut { + return lib.FormatJSON(os.Stdout, stats) + } + headers := []string{"FIELD", "VALUE"} + rows := [][]string{ + {"Topic", stats.Topic}, + {"Total Messages", strconv.FormatInt(stats.TotalMessages, 10)}, + {"Visible Messages", strconv.FormatInt(stats.VisibleMessages, 10)}, + {"Invisible Messages", strconv.FormatInt(stats.InvisibleMessages, 10)}, + {"DLQ Count", strconv.FormatInt(stats.DLQCount, 10)}, + {"Partitions", strconv.FormatInt(stats.PartitionCount, 10)}, + {"Consumer Groups", strconv.FormatInt(stats.ConsumerGroupCount, 10)}, + } + lib.FormatTable(os.Stdout, headers, rows) + return nil + }, + } + cmd.Flags().StringVar(&topic, "topic", "", "Topic name (required)") + cmd.Flags().StringVar(&dlqSuffix, "dlq-suffix", "_dlq", "DLQ topic suffix") + cmd.MarkFlagRequired("topic") + return cmd +} + +func newListMessagesCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command { + var topic, partition string + var limit int + cmd := &cobra.Command{ + Use: "list-messages", + Short: "List messages for a topic", + RunE: func(cmd *cobra.Command, args []string) error { + messages, err := (*store).ListMessages(cmd.Context(), topic, partition, limit) + if err != nil { + return err + } + if *jsonOut { + return lib.FormatJSON(os.Stdout, messages) + } + headers := []string{"OFFSET", "ID", "PARTITION", "RETRIES", "INVISIBLE_UNTIL", "CREATED_AT"} + var rows [][]string + for _, m := range messages { + rows = append(rows, []string{ + strconv.FormatInt(m.Offset, 10), + m.ID, + m.PartitionKey, + strconv.Itoa(m.RetryCount), + lib.FormatMillis(m.InvisibleUntil), + lib.FormatMillis(m.CreatedAt), + }) + } + lib.FormatTable(os.Stdout, headers, rows) + return nil + }, + } + cmd.Flags().StringVar(&topic, "topic", "", "Topic name (required)") + cmd.Flags().StringVar(&partition, "partition", "", "Filter by partition key") + cmd.Flags().IntVar(&limit, "limit", 50, "Maximum number of messages to show") + cmd.MarkFlagRequired("topic") + return cmd +} + +func newInspectMessageCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command { + var topic, messageID string + cmd := &cobra.Command{ + Use: "inspect-message", + Short: "Show full message details including payload and metadata", + RunE: func(cmd *cobra.Command, args []string) error { + detail, found, err := (*store).InspectMessage(cmd.Context(), topic, messageID) + if err != nil { + return err + } + if !found { + return fmt.Errorf("message %q not found in topic %q", messageID, topic) + } + if *jsonOut { + return lib.FormatJSON(os.Stdout, detail) + } + headers := []string{"FIELD", "VALUE"} + rows := [][]string{ + {"Offset", strconv.FormatInt(detail.Offset, 10)}, + {"ID", detail.ID}, + {"Topic", detail.Topic}, + {"Partition", detail.PartitionKey}, + {"Retry Count", strconv.Itoa(detail.RetryCount)}, + {"Invisible Until", lib.FormatMillis(detail.InvisibleUntil)}, + {"Created At", lib.FormatMillis(detail.CreatedAt)}, + {"Published At", lib.FormatMillis(detail.PublishedAt)}, + {"Payload", string(detail.Payload)}, + } + if len(detail.Metadata) > 0 { + for k, v := range detail.Metadata { + rows = append(rows, []string{fmt.Sprintf("Metadata[%s]", k), v}) + } + } + if detail.FailedAt > 0 { + rows = append(rows, + []string{"Failed At", lib.FormatMillis(detail.FailedAt)}, + []string{"Failure Count", strconv.Itoa(detail.FailureCount)}, + []string{"Last Error", detail.LastError}, + []string{"Original Topic", detail.OriginalTopic}, + ) + } + lib.FormatTable(os.Stdout, headers, rows) + return nil + }, + } + cmd.Flags().StringVar(&topic, "topic", "", "Topic name (required)") + cmd.Flags().StringVar(&messageID, "message-id", "", "Message ID (required)") + cmd.MarkFlagRequired("topic") + cmd.MarkFlagRequired("message-id") + return cmd +} + +func newDeleteMessageCmd(store **lib.AdminStore, noInteractive *bool) *cobra.Command { + var topic, messageID string + cmd := &cobra.Command{ + Use: "delete-message", + Short: "Delete a specific message", + RunE: func(cmd *cobra.Command, args []string) error { + if err := confirmAction(*noInteractive, fmt.Sprintf("Delete message %q from topic %q?", messageID, topic)); err != nil { + return err + } + affected, err := (*store).DeleteMessage(cmd.Context(), topic, messageID) + if err != nil { + return err + } + if affected == 0 { + fmt.Fprintf(os.Stderr, "No message found with ID %q in topic %q\n", messageID, topic) + return nil + } + fmt.Printf("Deleted message %q from topic %q\n", messageID, topic) + return nil + }, + } + cmd.Flags().StringVar(&topic, "topic", "", "Topic name (required)") + cmd.Flags().StringVar(&messageID, "message-id", "", "Message ID (required)") + cmd.MarkFlagRequired("topic") + cmd.MarkFlagRequired("message-id") + return cmd +} + +func newPurgeTopicCmd(store **lib.AdminStore, noInteractive *bool) *cobra.Command { + var topic string + cmd := &cobra.Command{ + Use: "purge-topic", + Short: "Delete all messages for a topic", + RunE: func(cmd *cobra.Command, args []string) error { + if err := confirmAction(*noInteractive, fmt.Sprintf("Purge ALL messages from topic %q?", topic)); err != nil { + return err + } + affected, err := (*store).PurgeTopic(cmd.Context(), topic) + if err != nil { + return err + } + fmt.Printf("Purged %d messages from topic %q\n", affected, topic) + return nil + }, + } + cmd.Flags().StringVar(&topic, "topic", "", "Topic name (required)") + cmd.MarkFlagRequired("topic") + return cmd +} + +func newListDLQCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command { + var topic, dlqSuffix string + var limit int + cmd := &cobra.Command{ + Use: "list-dlq", + Short: "List dead-letter queue messages for a topic", + RunE: func(cmd *cobra.Command, args []string) error { + dlqTopic := topic + dlqSuffix + messages, err := (*store).ListMessages(cmd.Context(), dlqTopic, "", limit) + if err != nil { + return err + } + if *jsonOut { + return lib.FormatJSON(os.Stdout, messages) + } + headers := []string{"OFFSET", "ID", "PARTITION", "RETRIES", "CREATED_AT"} + var rows [][]string + for _, m := range messages { + rows = append(rows, []string{ + strconv.FormatInt(m.Offset, 10), + m.ID, + m.PartitionKey, + strconv.Itoa(m.RetryCount), + lib.FormatMillis(m.CreatedAt), + }) + } + lib.FormatTable(os.Stdout, headers, rows) + return nil + }, + } + cmd.Flags().StringVar(&topic, "topic", "", "Original topic name (required)") + cmd.Flags().StringVar(&dlqSuffix, "dlq-suffix", "_dlq", "DLQ topic suffix") + cmd.Flags().IntVar(&limit, "limit", 50, "Maximum number of messages to show") + cmd.MarkFlagRequired("topic") + return cmd +} + +func newRequeueDLQCmd(store **lib.AdminStore) *cobra.Command { + var topic, messageID, dlqSuffix string + cmd := &cobra.Command{ + Use: "requeue-dlq", + Short: "Move a DLQ message back to its original topic", + RunE: func(cmd *cobra.Command, args []string) error { + if err := (*store).RequeueDLQ(cmd.Context(), topic, messageID, dlqSuffix); err != nil { + return err + } + fmt.Printf("Requeued message %q from DLQ back to topic %q\n", messageID, topic) + return nil + }, + } + cmd.Flags().StringVar(&topic, "topic", "", "Original topic name (required)") + cmd.Flags().StringVar(&messageID, "message-id", "", "Message ID (required)") + cmd.Flags().StringVar(&dlqSuffix, "dlq-suffix", "_dlq", "DLQ topic suffix") + cmd.MarkFlagRequired("topic") + cmd.MarkFlagRequired("message-id") + return cmd +} + +func newPurgeDLQCmd(store **lib.AdminStore, noInteractive *bool) *cobra.Command { + var topic, dlqSuffix string + cmd := &cobra.Command{ + Use: "purge-dlq", + Short: "Delete all DLQ messages for a topic", + RunE: func(cmd *cobra.Command, args []string) error { + dlqTopic := topic + dlqSuffix + if err := confirmAction(*noInteractive, fmt.Sprintf("Purge ALL messages from DLQ topic %q?", dlqTopic)); err != nil { + return err + } + affected, err := (*store).PurgeTopic(cmd.Context(), dlqTopic) + if err != nil { + return err + } + fmt.Printf("Purged %d messages from DLQ topic %q\n", affected, dlqTopic) + return nil + }, + } + cmd.Flags().StringVar(&topic, "topic", "", "Original topic name (required)") + cmd.Flags().StringVar(&dlqSuffix, "dlq-suffix", "_dlq", "DLQ topic suffix") + cmd.MarkFlagRequired("topic") + return cmd +} + +func newListOffsetsCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command { + var consumerGroup string + cmd := &cobra.Command{ + Use: "list-offsets", + Short: "Show consumer group offsets", + RunE: func(cmd *cobra.Command, args []string) error { + offsets, err := (*store).ListOffsets(cmd.Context(), consumerGroup) + if err != nil { + return err + } + if *jsonOut { + return lib.FormatJSON(os.Stdout, offsets) + } + headers := []string{"CONSUMER_GROUP", "TOPIC", "PARTITION", "OFFSET_ACKED", "UPDATED_AT"} + var rows [][]string + for _, o := range offsets { + rows = append(rows, []string{ + o.ConsumerGroup, + o.Topic, + o.PartitionKey, + strconv.FormatInt(o.OffsetAcked, 10), + lib.FormatMillis(o.UpdatedAt), + }) + } + lib.FormatTable(os.Stdout, headers, rows) + return nil + }, + } + cmd.Flags().StringVar(&consumerGroup, "consumer-group", "", "Filter by consumer group") + return cmd +} + +func newResetOffsetCmd(store **lib.AdminStore, noInteractive *bool) *cobra.Command { + var consumerGroup, topic, partition string + var offset int64 + cmd := &cobra.Command{ + Use: "reset-offset", + Short: "Reset consumer group offset for a partition", + RunE: func(cmd *cobra.Command, args []string) error { + if err := confirmAction(*noInteractive, fmt.Sprintf("Reset offset to %d for consumer-group=%q topic=%q partition=%q?", offset, consumerGroup, topic, partition)); err != nil { + return err + } + affected, err := (*store).ResetOffset(cmd.Context(), consumerGroup, topic, partition, offset) + if err != nil { + return err + } + if affected == 0 { + fmt.Fprintf(os.Stderr, "No offset found for consumer-group=%q topic=%q partition=%q\n", consumerGroup, topic, partition) + return nil + } + fmt.Printf("Reset offset to %d for consumer-group=%q topic=%q partition=%q\n", offset, consumerGroup, topic, partition) + return nil + }, + } + cmd.Flags().StringVar(&consumerGroup, "consumer-group", "", "Consumer group name (required)") + cmd.Flags().StringVar(&topic, "topic", "", "Topic name (required)") + cmd.Flags().StringVar(&partition, "partition", "", "Partition key (required)") + cmd.Flags().Int64Var(&offset, "offset", 0, "New offset value (default 0)") + cmd.MarkFlagRequired("consumer-group") + cmd.MarkFlagRequired("topic") + cmd.MarkFlagRequired("partition") + return cmd +} + +func newListLeasesCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command { + return &cobra.Command{ + Use: "list-leases", + Short: "Show all active partition leases", + RunE: func(cmd *cobra.Command, args []string) error { + leases, err := (*store).ListLeases(cmd.Context()) + if err != nil { + return err + } + if *jsonOut { + return lib.FormatJSON(os.Stdout, leases) + } + headers := []string{"CONSUMER_GROUP", "TOPIC", "PARTITION", "LEASED_BY", "LEASED_AT", "RENEWED_AT"} + var rows [][]string + for _, l := range leases { + rows = append(rows, []string{ + l.ConsumerGroup, + l.Topic, + l.PartitionKey, + l.LeasedBy, + lib.FormatMillis(l.LeasedAt), + lib.FormatMillis(l.LeaseRenewedAt), + }) + } + lib.FormatTable(os.Stdout, headers, rows) + return nil + }, + } +} + +func newConsumerLagCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command { + var topic string + cmd := &cobra.Command{ + Use: "consumer-lag", + Short: "Show per-partition consumer lag for a topic", + RunE: func(cmd *cobra.Command, args []string) error { + lags, err := (*store).ConsumerLag(cmd.Context(), topic) + if err != nil { + return err + } + if *jsonOut { + return lib.FormatJSON(os.Stdout, lags) + } + headers := []string{"CONSUMER_GROUP", "TOPIC", "PARTITION", "ACKED", "LATEST", "LAG"} + var rows [][]string + for _, l := range lags { + rows = append(rows, []string{ + l.ConsumerGroup, + l.Topic, + l.PartitionKey, + strconv.FormatInt(l.AckedOffset, 10), + strconv.FormatInt(l.LatestOffset, 10), + strconv.FormatInt(l.Lag, 10), + }) + } + lib.FormatTable(os.Stdout, headers, rows) + return nil + }, + } + cmd.Flags().StringVar(&topic, "topic", "", "Topic name (required)") + cmd.MarkFlagRequired("topic") + return cmd +} + +func newStaleLeasesCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command { + var thresholdMs int64 + cmd := &cobra.Command{ + Use: "stale-leases", + Short: "Show partition leases not renewed within a threshold", + RunE: func(cmd *cobra.Command, args []string) error { + leases, err := (*store).StaleLeases(cmd.Context(), thresholdMs) + if err != nil { + return err + } + if len(leases) == 0 { + fmt.Println("No stale leases found.") + return nil + } + if *jsonOut { + return lib.FormatJSON(os.Stdout, leases) + } + headers := []string{"CONSUMER_GROUP", "TOPIC", "PARTITION", "LEASED_BY", "LEASED_AT", "RENEWED_AT"} + var rows [][]string + for _, l := range leases { + rows = append(rows, []string{ + l.ConsumerGroup, + l.Topic, + l.PartitionKey, + l.LeasedBy, + lib.FormatMillis(l.LeasedAt), + lib.FormatMillis(l.LeaseRenewedAt), + }) + } + lib.FormatTable(os.Stdout, headers, rows) + return nil + }, + } + cmd.Flags().Int64Var(&thresholdMs, "threshold", 60000, "Staleness threshold in milliseconds (default 60s)") + return cmd +} + +func newReleaseLeaseCmd(store **lib.AdminStore, noInteractive *bool) *cobra.Command { + var consumerGroup, topic, partition string + cmd := &cobra.Command{ + Use: "release-lease", + Short: "Force-release a partition lease", + RunE: func(cmd *cobra.Command, args []string) error { + if err := confirmAction(*noInteractive, fmt.Sprintf("Release lease for consumer-group=%q topic=%q partition=%q?", consumerGroup, topic, partition)); err != nil { + return err + } + affected, err := (*store).ReleaseLease(cmd.Context(), consumerGroup, topic, partition) + if err != nil { + return err + } + if affected == 0 { + fmt.Fprintf(os.Stderr, "No lease found for consumer-group=%q topic=%q partition=%q\n", consumerGroup, topic, partition) + return nil + } + fmt.Printf("Released lease for consumer-group=%q topic=%q partition=%q\n", consumerGroup, topic, partition) + return nil + }, + } + cmd.Flags().StringVar(&consumerGroup, "consumer-group", "", "Consumer group name (required)") + cmd.Flags().StringVar(&topic, "topic", "", "Topic name (required)") + cmd.Flags().StringVar(&partition, "partition", "", "Partition key (required)") + cmd.MarkFlagRequired("consumer-group") + cmd.MarkFlagRequired("topic") + cmd.MarkFlagRequired("partition") + return cmd +} diff --git a/extension/queue/mysql/ctl/lib/BUILD.bazel b/extension/queue/mysql/ctl/lib/BUILD.bazel new file mode 100644 index 00000000..d7e7b1ad --- /dev/null +++ b/extension/queue/mysql/ctl/lib/BUILD.bazel @@ -0,0 +1,23 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "lib", + srcs = [ + "admin.go", + "format.go", + ], + importpath = "github.com/uber/submitqueue/extension/queue/mysql/ctl/lib", + visibility = ["//visibility:public"], + deps = ["//extension/queue/mysql"], +) + +go_test( + name = "lib_test", + srcs = ["admin_test.go"], + embed = [":lib"], + deps = [ + "@com_github_data_dog_go_sqlmock//:go-sqlmock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/extension/queue/mysql/ctl/lib/admin.go b/extension/queue/mysql/ctl/lib/admin.go new file mode 100644 index 00000000..aa68bf60 --- /dev/null +++ b/extension/queue/mysql/ctl/lib/admin.go @@ -0,0 +1,486 @@ +package lib + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "time" + + mysql "github.com/uber/submitqueue/extension/queue/mysql" +) + +// AdminStore provides read-only inspection and targeted admin mutations +// for the MySQL queue tables. +type AdminStore struct { + db *sql.DB +} + +// NewAdminStore creates a new AdminStore backed by the given database connection. +func NewAdminStore(db *sql.DB) *AdminStore { + return &AdminStore{db: db} +} + +// MessageSummary contains a subset of message fields for listing. +type MessageSummary struct { + // Offset is the auto-incrementing sequence number + Offset int64 + // ID is the unique message identifier + ID string + // Topic identifies the queue type + Topic string + // PartitionKey determines message distribution + PartitionKey string + // RetryCount tracks retries on the current topic + RetryCount int + // InvisibleUntil is the epoch milliseconds until which the message is hidden + InvisibleUntil int64 + // CreatedAt is the epoch milliseconds when the message was created + CreatedAt int64 + // PublishedAt is the epoch milliseconds when the message was published + PublishedAt int64 +} + +// MessageDetail contains all message fields including payload and DLQ info. +type MessageDetail struct { + MessageSummary + // Payload is the message body + Payload []byte + // Metadata contains key-value pairs for message attributes + Metadata map[string]string + // FailedAt is epoch milliseconds when the message failed (0 for normal) + FailedAt int64 + // FailureCount is total failures before DLQ move (0 for normal) + FailureCount int + // LastError is the error message from final failure + LastError string + // OriginalTopic is where the message originally failed + OriginalTopic string +} + +// OffsetInfo contains consumer group offset information. +type OffsetInfo struct { + // ConsumerGroup is the consumer group name + ConsumerGroup string + // Topic is the topic being consumed + Topic string + // PartitionKey is the partition being consumed + PartitionKey string + // OffsetAcked is the last successfully acked offset + OffsetAcked int64 + // UpdatedAt is the epoch milliseconds of the last update + UpdatedAt int64 +} + +// LeaseInfo contains partition lease information. +type LeaseInfo struct { + // ConsumerGroup is the consumer group name + ConsumerGroup string + // Topic is the topic being consumed + Topic string + // PartitionKey is the partition that is leased + PartitionKey string + // LeasedBy is the worker that owns the lease + LeasedBy string + // LeasedAt is the epoch milliseconds when the lease was acquired + LeasedAt int64 + // LeaseRenewedAt is the epoch milliseconds of the last renewal + LeaseRenewedAt int64 +} + +// TopicInfo contains a topic name and its message count. +type TopicInfo struct { + // Topic is the queue topic name + Topic string + // MessageCount is the number of messages in this topic + MessageCount int64 +} + +// TopicStats contains detailed statistics for a topic. +type TopicStats struct { + // Topic is the queue topic name + Topic string + // TotalMessages is the total number of messages + TotalMessages int64 + // VisibleMessages is the count of messages currently visible for consumption + VisibleMessages int64 + // InvisibleMessages is the count of messages hidden by visibility timeout + InvisibleMessages int64 + // DLQCount is the number of messages in the DLQ for this topic + DLQCount int64 + // PartitionCount is the number of distinct partitions + PartitionCount int64 + // ConsumerGroupCount is the number of consumer groups consuming this topic + ConsumerGroupCount int64 +} + +// ListTopics returns all topics with their message counts. +func (s *AdminStore) ListTopics(ctx context.Context) ([]TopicInfo, error) { + query := fmt.Sprintf( + "SELECT topic, COUNT(*) FROM %s GROUP BY topic ORDER BY topic", + mysql.MessagesTableName, + ) + rows, err := s.db.QueryContext(ctx, query) + if err != nil { + return nil, fmt.Errorf("list topics: %w", err) + } + defer rows.Close() + + var topics []TopicInfo + for rows.Next() { + var t TopicInfo + if err := rows.Scan(&t.Topic, &t.MessageCount); err != nil { + return nil, fmt.Errorf("scan topic row: %w", err) + } + topics = append(topics, t) + } + return topics, rows.Err() +} + +// GetTopicStats returns detailed statistics for a topic. +func (s *AdminStore) GetTopicStats(ctx context.Context, topic string, dlqSuffix string) (TopicStats, error) { + stats := TopicStats{Topic: topic} + nowMs := time.Now().UnixMilli() + + // Total messages + err := s.db.QueryRowContext(ctx, + fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE topic = ?", mysql.MessagesTableName), + topic, + ).Scan(&stats.TotalMessages) + if err != nil { + return stats, fmt.Errorf("count total: %w", err) + } + + // Visible messages (invisible_until <= now) + err = s.db.QueryRowContext(ctx, + fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE topic = ? AND invisible_until <= ?", mysql.MessagesTableName), + topic, nowMs, + ).Scan(&stats.VisibleMessages) + if err != nil { + return stats, fmt.Errorf("count visible: %w", err) + } + + // Invisible messages + stats.InvisibleMessages = stats.TotalMessages - stats.VisibleMessages + + // DLQ count + dlqTopic := topic + dlqSuffix + err = s.db.QueryRowContext(ctx, + fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE topic = ?", mysql.MessagesTableName), + dlqTopic, + ).Scan(&stats.DLQCount) + if err != nil { + return stats, fmt.Errorf("count dlq: %w", err) + } + + // Distinct partitions + err = s.db.QueryRowContext(ctx, + fmt.Sprintf("SELECT COUNT(DISTINCT partition_key) FROM %s WHERE topic = ?", mysql.MessagesTableName), + topic, + ).Scan(&stats.PartitionCount) + if err != nil { + return stats, fmt.Errorf("count partitions: %w", err) + } + + // Consumer groups from offsets + err = s.db.QueryRowContext(ctx, + fmt.Sprintf("SELECT COUNT(DISTINCT consumer_group) FROM %s WHERE topic = ?", mysql.OffsetsTableName), + topic, + ).Scan(&stats.ConsumerGroupCount) + if err != nil { + return stats, fmt.Errorf("count consumer groups: %w", err) + } + + return stats, nil +} + +// ListMessages returns messages for a topic, optionally filtered by partition. +func (s *AdminStore) ListMessages(ctx context.Context, topic string, partition string, limit int) ([]MessageSummary, error) { + var rows *sql.Rows + var err error + + if partition != "" { + rows, err = s.db.QueryContext(ctx, + fmt.Sprintf("SELECT `offset`, id, topic, partition_key, retry_count, invisible_until, created_at, published_at FROM %s WHERE topic = ? AND partition_key = ? ORDER BY `offset` LIMIT ?", mysql.MessagesTableName), + topic, partition, limit, + ) + } else { + rows, err = s.db.QueryContext(ctx, + fmt.Sprintf("SELECT `offset`, id, topic, partition_key, retry_count, invisible_until, created_at, published_at FROM %s WHERE topic = ? ORDER BY `offset` LIMIT ?", mysql.MessagesTableName), + topic, limit, + ) + } + if err != nil { + return nil, fmt.Errorf("list messages: %w", err) + } + defer rows.Close() + + var messages []MessageSummary + for rows.Next() { + var m MessageSummary + if err := rows.Scan(&m.Offset, &m.ID, &m.Topic, &m.PartitionKey, &m.RetryCount, &m.InvisibleUntil, &m.CreatedAt, &m.PublishedAt); err != nil { + return nil, fmt.Errorf("scan message row: %w", err) + } + messages = append(messages, m) + } + return messages, rows.Err() +} + +// InspectMessage returns full message details including payload and DLQ fields. +func (s *AdminStore) InspectMessage(ctx context.Context, topic string, messageID string) (MessageDetail, bool, error) { + var d MessageDetail + var metadataJSON []byte + + err := s.db.QueryRowContext(ctx, + fmt.Sprintf("SELECT `offset`, id, topic, partition_key, retry_count, invisible_until, created_at, published_at, payload, metadata, failed_at, failure_count, last_error, original_topic FROM %s WHERE topic = ? AND id = ?", mysql.MessagesTableName), + topic, messageID, + ).Scan(&d.Offset, &d.ID, &d.Topic, &d.PartitionKey, &d.RetryCount, &d.InvisibleUntil, &d.CreatedAt, &d.PublishedAt, &d.Payload, &metadataJSON, &d.FailedAt, &d.FailureCount, &d.LastError, &d.OriginalTopic) + if err == sql.ErrNoRows { + return d, false, nil + } + if err != nil { + return d, false, fmt.Errorf("inspect message: %w", err) + } + + if len(metadataJSON) > 0 { + if err := json.Unmarshal(metadataJSON, &d.Metadata); err != nil { + return d, false, fmt.Errorf("unmarshal metadata: %w", err) + } + } + if d.Metadata == nil { + d.Metadata = make(map[string]string) + } + + return d, true, nil +} + +// DeleteMessage deletes a specific message by topic and ID. +func (s *AdminStore) DeleteMessage(ctx context.Context, topic string, messageID string) (int64, error) { + result, err := s.db.ExecContext(ctx, + fmt.Sprintf("DELETE FROM %s WHERE topic = ? AND id = ?", mysql.MessagesTableName), + topic, messageID, + ) + if err != nil { + return 0, fmt.Errorf("delete message: %w", err) + } + return result.RowsAffected() +} + +// PurgeTopic deletes all messages for a topic. +func (s *AdminStore) PurgeTopic(ctx context.Context, topic string) (int64, error) { + result, err := s.db.ExecContext(ctx, + fmt.Sprintf("DELETE FROM %s WHERE topic = ?", mysql.MessagesTableName), + topic, + ) + if err != nil { + return 0, fmt.Errorf("purge topic: %w", err) + } + return result.RowsAffected() +} + +// RequeueDLQ moves a message from the DLQ topic back to its original topic. +// This is done transactionally: read from DLQ, insert into original topic, delete from DLQ. +func (s *AdminStore) RequeueDLQ(ctx context.Context, topic string, messageID string, dlqSuffix string) error { + dlqTopic := topic + dlqSuffix + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer tx.Rollback() + + // Read the DLQ message + var payload []byte + var metadataJSON []byte + var partitionKey string + var createdAt, publishedAt int64 + + err = tx.QueryRowContext(ctx, + fmt.Sprintf("SELECT payload, metadata, partition_key, created_at, published_at FROM %s WHERE topic = ? AND id = ?", mysql.MessagesTableName), + dlqTopic, messageID, + ).Scan(&payload, &metadataJSON, &partitionKey, &createdAt, &publishedAt) + if err == sql.ErrNoRows { + return fmt.Errorf("message %q not found in DLQ topic %q", messageID, dlqTopic) + } + if err != nil { + return fmt.Errorf("read dlq message: %w", err) + } + + // Insert into original topic with reset fields + nowMs := time.Now().UnixMilli() + _, err = tx.ExecContext(ctx, + fmt.Sprintf("INSERT INTO %s (topic, partition_key, id, payload, metadata, retry_count, invisible_until, created_at, published_at, failed_at, failure_count, last_error, original_topic) VALUES (?, ?, ?, ?, ?, 0, 0, ?, ?, 0, 0, '', '')", mysql.MessagesTableName), + topic, partitionKey, messageID, payload, metadataJSON, createdAt, nowMs, + ) + if err != nil { + return fmt.Errorf("insert requeued message: %w", err) + } + + // Delete from DLQ + _, err = tx.ExecContext(ctx, + fmt.Sprintf("DELETE FROM %s WHERE topic = ? AND id = ?", mysql.MessagesTableName), + dlqTopic, messageID, + ) + if err != nil { + return fmt.Errorf("delete dlq message: %w", err) + } + + return tx.Commit() +} + +// ListOffsets returns consumer group offsets, optionally filtered by group. +func (s *AdminStore) ListOffsets(ctx context.Context, consumerGroup string) ([]OffsetInfo, error) { + var rows *sql.Rows + var err error + + if consumerGroup != "" { + rows, err = s.db.QueryContext(ctx, + fmt.Sprintf("SELECT consumer_group, topic, partition_key, offset_acked, updated_at FROM %s WHERE consumer_group = ? ORDER BY consumer_group, topic, partition_key", mysql.OffsetsTableName), + consumerGroup, + ) + } else { + rows, err = s.db.QueryContext(ctx, + fmt.Sprintf("SELECT consumer_group, topic, partition_key, offset_acked, updated_at FROM %s ORDER BY consumer_group, topic, partition_key", mysql.OffsetsTableName), + ) + } + if err != nil { + return nil, fmt.Errorf("list offsets: %w", err) + } + defer rows.Close() + + var offsets []OffsetInfo + for rows.Next() { + var o OffsetInfo + if err := rows.Scan(&o.ConsumerGroup, &o.Topic, &o.PartitionKey, &o.OffsetAcked, &o.UpdatedAt); err != nil { + return nil, fmt.Errorf("scan offset row: %w", err) + } + offsets = append(offsets, o) + } + return offsets, rows.Err() +} + +// ResetOffset updates the acked offset for a consumer group/topic/partition. +func (s *AdminStore) ResetOffset(ctx context.Context, consumerGroup, topic, partition string, offset int64) (int64, error) { + nowMs := time.Now().UnixMilli() + result, err := s.db.ExecContext(ctx, + fmt.Sprintf("UPDATE %s SET offset_acked = ?, updated_at = ? WHERE consumer_group = ? AND topic = ? AND partition_key = ?", mysql.OffsetsTableName), + offset, nowMs, consumerGroup, topic, partition, + ) + if err != nil { + return 0, fmt.Errorf("reset offset: %w", err) + } + return result.RowsAffected() +} + +// ListLeases returns all partition leases. +func (s *AdminStore) ListLeases(ctx context.Context) ([]LeaseInfo, error) { + rows, err := s.db.QueryContext(ctx, + fmt.Sprintf("SELECT consumer_group, topic, partition_key, leased_by, leased_at, lease_renewed_at FROM %s ORDER BY consumer_group, topic, partition_key", mysql.PartitionLeasesTableName), + ) + if err != nil { + return nil, fmt.Errorf("list leases: %w", err) + } + defer rows.Close() + + var leases []LeaseInfo + for rows.Next() { + var l LeaseInfo + if err := rows.Scan(&l.ConsumerGroup, &l.Topic, &l.PartitionKey, &l.LeasedBy, &l.LeasedAt, &l.LeaseRenewedAt); err != nil { + return nil, fmt.Errorf("scan lease row: %w", err) + } + leases = append(leases, l) + } + return leases, rows.Err() +} + +// LagInfo contains consumer lag information for a single partition. +type LagInfo struct { + // ConsumerGroup is the consumer group name + ConsumerGroup string + // Topic is the topic being consumed + Topic string + // PartitionKey is the partition being consumed + PartitionKey string + // LatestOffset is the highest message offset in this partition + LatestOffset int64 + // AckedOffset is the last acked offset for this consumer group + AckedOffset int64 + // Lag is the number of unprocessed messages (LatestOffset - AckedOffset) + Lag int64 +} + +// ConsumerLag returns per-partition lag for each consumer group on a topic. +// Lag = max message offset in partition - consumer group's acked offset. +func (s *AdminStore) ConsumerLag(ctx context.Context, topic string) ([]LagInfo, error) { + query := fmt.Sprintf(` + SELECT o.consumer_group, o.topic, o.partition_key, o.offset_acked, + COALESCE(m.latest_offset, 0) AS latest_offset + FROM %s o + LEFT JOIN ( + SELECT topic, partition_key, MAX(`+"`offset`"+`) AS latest_offset + FROM %s + WHERE topic = ? + GROUP BY topic, partition_key + ) m ON o.topic = m.topic AND o.partition_key = m.partition_key + WHERE o.topic = ? + ORDER BY o.consumer_group, o.partition_key`, + mysql.OffsetsTableName, mysql.MessagesTableName, + ) + + rows, err := s.db.QueryContext(ctx, query, topic, topic) + if err != nil { + return nil, fmt.Errorf("consumer lag: %w", err) + } + defer rows.Close() + + var results []LagInfo + for rows.Next() { + var l LagInfo + if err := rows.Scan(&l.ConsumerGroup, &l.Topic, &l.PartitionKey, &l.AckedOffset, &l.LatestOffset); err != nil { + return nil, fmt.Errorf("scan lag row: %w", err) + } + l.Lag = l.LatestOffset - l.AckedOffset + if l.Lag < 0 { + l.Lag = 0 + } + results = append(results, l) + } + return results, rows.Err() +} + +// StaleLeases returns leases whose lease_renewed_at is older than the threshold. +// thresholdMs is the staleness threshold in milliseconds — leases not renewed +// within this duration from now are considered stale. +func (s *AdminStore) StaleLeases(ctx context.Context, thresholdMs int64) ([]LeaseInfo, error) { + cutoff := time.Now().UnixMilli() - thresholdMs + rows, err := s.db.QueryContext(ctx, + fmt.Sprintf("SELECT consumer_group, topic, partition_key, leased_by, leased_at, lease_renewed_at FROM %s WHERE lease_renewed_at < ? ORDER BY lease_renewed_at", mysql.PartitionLeasesTableName), + cutoff, + ) + if err != nil { + return nil, fmt.Errorf("stale leases: %w", err) + } + defer rows.Close() + + var leases []LeaseInfo + for rows.Next() { + var l LeaseInfo + if err := rows.Scan(&l.ConsumerGroup, &l.Topic, &l.PartitionKey, &l.LeasedBy, &l.LeasedAt, &l.LeaseRenewedAt); err != nil { + return nil, fmt.Errorf("scan stale lease row: %w", err) + } + leases = append(leases, l) + } + return leases, rows.Err() +} + +// ReleaseLease force-releases a partition lease. +func (s *AdminStore) ReleaseLease(ctx context.Context, consumerGroup, topic, partition string) (int64, error) { + result, err := s.db.ExecContext(ctx, + fmt.Sprintf("DELETE FROM %s WHERE consumer_group = ? AND topic = ? AND partition_key = ?", mysql.PartitionLeasesTableName), + consumerGroup, topic, partition, + ) + if err != nil { + return 0, fmt.Errorf("release lease: %w", err) + } + return result.RowsAffected() +} diff --git a/extension/queue/mysql/ctl/lib/admin_test.go b/extension/queue/mysql/ctl/lib/admin_test.go new file mode 100644 index 00000000..f7d2219b --- /dev/null +++ b/extension/queue/mysql/ctl/lib/admin_test.go @@ -0,0 +1,420 @@ +package lib + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestListTopics(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + rows := sqlmock.NewRows([]string{"topic", "count"}). + AddRow("orders", 10). + AddRow("payments", 5) + mock.ExpectQuery("SELECT topic, COUNT\\(\\*\\) FROM queue_messages GROUP BY topic ORDER BY topic"). + WillReturnRows(rows) + + topics, err := store.ListTopics(context.Background()) + require.NoError(t, err) + assert.Len(t, topics, 2) + assert.Equal(t, "orders", topics[0].Topic) + assert.Equal(t, int64(10), topics[0].MessageCount) + assert.Equal(t, "payments", topics[1].Topic) + assert.Equal(t, int64(5), topics[1].MessageCount) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestListTopicsEmpty(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + rows := sqlmock.NewRows([]string{"topic", "count"}) + mock.ExpectQuery("SELECT topic, COUNT\\(\\*\\) FROM queue_messages GROUP BY topic ORDER BY topic"). + WillReturnRows(rows) + + topics, err := store.ListTopics(context.Background()) + require.NoError(t, err) + assert.Empty(t, topics) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestGetTopicStats(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + // Total messages + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM queue_messages WHERE topic = \\?"). + WithArgs("orders"). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(100)) + + // Visible messages + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM queue_messages WHERE topic = \\? AND invisible_until <= \\?"). + WithArgs("orders", sqlmock.AnyArg()). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(80)) + + // DLQ count + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM queue_messages WHERE topic = \\?"). + WithArgs("orders_dlq"). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(3)) + + // Distinct partitions + mock.ExpectQuery("SELECT COUNT\\(DISTINCT partition_key\\) FROM queue_messages WHERE topic = \\?"). + WithArgs("orders"). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(4)) + + // Consumer groups + mock.ExpectQuery("SELECT COUNT\\(DISTINCT consumer_group\\) FROM queue_offsets WHERE topic = \\?"). + WithArgs("orders"). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(2)) + + stats, err := store.GetTopicStats(context.Background(), "orders", "_dlq") + require.NoError(t, err) + assert.Equal(t, "orders", stats.Topic) + assert.Equal(t, int64(100), stats.TotalMessages) + assert.Equal(t, int64(80), stats.VisibleMessages) + assert.Equal(t, int64(20), stats.InvisibleMessages) + assert.Equal(t, int64(3), stats.DLQCount) + assert.Equal(t, int64(4), stats.PartitionCount) + assert.Equal(t, int64(2), stats.ConsumerGroupCount) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestListMessages(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "retry_count", "invisible_until", "created_at", "published_at"}). + AddRow(1, "msg-1", "orders", "repo-1", 0, 0, 1000, 1000). + AddRow(2, "msg-2", "orders", "repo-1", 1, 5000, 2000, 2000) + mock.ExpectQuery("SELECT .+ FROM queue_messages WHERE topic = \\? ORDER BY `offset` LIMIT \\?"). + WithArgs("orders", 50). + WillReturnRows(rows) + + messages, err := store.ListMessages(context.Background(), "orders", "", 50) + require.NoError(t, err) + assert.Len(t, messages, 2) + assert.Equal(t, "msg-1", messages[0].ID) + assert.Equal(t, int64(1), messages[0].Offset) + assert.Equal(t, "msg-2", messages[1].ID) + assert.Equal(t, 1, messages[1].RetryCount) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestListMessagesWithPartition(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "retry_count", "invisible_until", "created_at", "published_at"}). + AddRow(1, "msg-1", "orders", "repo-1", 0, 0, 1000, 1000) + mock.ExpectQuery("SELECT .+ FROM queue_messages WHERE topic = \\? AND partition_key = \\? ORDER BY `offset` LIMIT \\?"). + WithArgs("orders", "repo-1", 10). + WillReturnRows(rows) + + messages, err := store.ListMessages(context.Background(), "orders", "repo-1", 10) + require.NoError(t, err) + assert.Len(t, messages, 1) + assert.Equal(t, "repo-1", messages[0].PartitionKey) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestInspectMessage(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "retry_count", "invisible_until", "created_at", "published_at", "payload", "metadata", "failed_at", "failure_count", "last_error", "original_topic"}). + AddRow(1, "msg-1", "orders", "repo-1", 0, 0, 1000, 1000, []byte("hello"), []byte(`{"key":"val"}`), 0, 0, "", "") + mock.ExpectQuery("SELECT .+ FROM queue_messages WHERE topic = \\? AND id = \\?"). + WithArgs("orders", "msg-1"). + WillReturnRows(rows) + + detail, found, err := store.InspectMessage(context.Background(), "orders", "msg-1") + require.NoError(t, err) + assert.True(t, found) + assert.Equal(t, "msg-1", detail.ID) + assert.Equal(t, []byte("hello"), detail.Payload) + assert.Equal(t, "val", detail.Metadata["key"]) + assert.Equal(t, int64(0), detail.FailedAt) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestInspectMessageNotFound(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + rows := sqlmock.NewRows([]string{"offset", "id", "topic", "partition_key", "retry_count", "invisible_until", "created_at", "published_at", "payload", "metadata", "failed_at", "failure_count", "last_error", "original_topic"}) + mock.ExpectQuery("SELECT .+ FROM queue_messages WHERE topic = \\? AND id = \\?"). + WithArgs("orders", "missing"). + WillReturnRows(rows) + + _, found, err := store.InspectMessage(context.Background(), "orders", "missing") + require.NoError(t, err) + assert.False(t, found) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestDeleteMessage(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + mock.ExpectExec("DELETE FROM queue_messages WHERE topic = \\? AND id = \\?"). + WithArgs("orders", "msg-1"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + affected, err := store.DeleteMessage(context.Background(), "orders", "msg-1") + require.NoError(t, err) + assert.Equal(t, int64(1), affected) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestPurgeTopic(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + mock.ExpectExec("DELETE FROM queue_messages WHERE topic = \\?"). + WithArgs("orders"). + WillReturnResult(sqlmock.NewResult(0, 42)) + + affected, err := store.PurgeTopic(context.Background(), "orders") + require.NoError(t, err) + assert.Equal(t, int64(42), affected) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestRequeueDLQ(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + mock.ExpectBegin() + mock.ExpectQuery("SELECT .+ FROM queue_messages WHERE topic = \\? AND id = \\?"). + WithArgs("orders_dlq", "msg-1"). + WillReturnRows(sqlmock.NewRows([]string{"payload", "metadata", "partition_key", "created_at", "published_at"}). + AddRow([]byte("data"), []byte(`{}`), "repo-1", 1000, 1000)) + mock.ExpectExec("INSERT INTO queue_messages"). + WithArgs("orders", "repo-1", "msg-1", []byte("data"), []byte(`{}`), int64(1000), sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("DELETE FROM queue_messages WHERE topic = \\? AND id = \\?"). + WithArgs("orders_dlq", "msg-1"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + + err = store.RequeueDLQ(context.Background(), "orders", "msg-1", "_dlq") + require.NoError(t, err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestListOffsets(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + rows := sqlmock.NewRows([]string{"consumer_group", "topic", "partition_key", "offset_acked", "updated_at"}). + AddRow("group-1", "orders", "repo-1", 100, 5000) + mock.ExpectQuery("SELECT .+ FROM queue_offsets ORDER BY"). + WillReturnRows(rows) + + offsets, err := store.ListOffsets(context.Background(), "") + require.NoError(t, err) + assert.Len(t, offsets, 1) + assert.Equal(t, "group-1", offsets[0].ConsumerGroup) + assert.Equal(t, int64(100), offsets[0].OffsetAcked) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestListOffsetsFiltered(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + rows := sqlmock.NewRows([]string{"consumer_group", "topic", "partition_key", "offset_acked", "updated_at"}). + AddRow("group-1", "orders", "repo-1", 100, 5000) + mock.ExpectQuery("SELECT .+ FROM queue_offsets WHERE consumer_group = \\?"). + WithArgs("group-1"). + WillReturnRows(rows) + + offsets, err := store.ListOffsets(context.Background(), "group-1") + require.NoError(t, err) + assert.Len(t, offsets, 1) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestResetOffset(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + mock.ExpectExec("UPDATE queue_offsets SET offset_acked = \\?, updated_at = \\?"). + WithArgs(int64(0), sqlmock.AnyArg(), "group-1", "orders", "repo-1"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + affected, err := store.ResetOffset(context.Background(), "group-1", "orders", "repo-1", 0) + require.NoError(t, err) + assert.Equal(t, int64(1), affected) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestListLeases(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + rows := sqlmock.NewRows([]string{"consumer_group", "topic", "partition_key", "leased_by", "leased_at", "lease_renewed_at"}). + AddRow("group-1", "orders", "repo-1", "worker-1", 1000, 2000) + mock.ExpectQuery("SELECT .+ FROM queue_partition_leases ORDER BY"). + WillReturnRows(rows) + + leases, err := store.ListLeases(context.Background()) + require.NoError(t, err) + assert.Len(t, leases, 1) + assert.Equal(t, "worker-1", leases[0].LeasedBy) + assert.Equal(t, int64(1000), leases[0].LeasedAt) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestReleaseLease(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + mock.ExpectExec("DELETE FROM queue_partition_leases WHERE consumer_group = \\? AND topic = \\? AND partition_key = \\?"). + WithArgs("group-1", "orders", "repo-1"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + affected, err := store.ReleaseLease(context.Background(), "group-1", "orders", "repo-1") + require.NoError(t, err) + assert.Equal(t, int64(1), affected) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestFormatMillis(t *testing.T) { + assert.Equal(t, "-", FormatMillis(0)) + // 2024-01-01T00:00:00Z in milliseconds + assert.Equal(t, "2024-01-01T00:00:00Z", FormatMillis(1704067200000)) +} + +func TestConsumerLag(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + rows := sqlmock.NewRows([]string{"consumer_group", "topic", "partition_key", "offset_acked", "latest_offset"}). + AddRow("group-1", "orders", "repo-1", 50, 100). + AddRow("group-1", "orders", "repo-2", 75, 75) + mock.ExpectQuery("SELECT .+ FROM queue_offsets .+ LEFT JOIN"). + WithArgs("orders", "orders"). + WillReturnRows(rows) + + lags, err := store.ConsumerLag(context.Background(), "orders") + require.NoError(t, err) + assert.Len(t, lags, 2) + assert.Equal(t, int64(50), lags[0].Lag) + assert.Equal(t, int64(100), lags[0].LatestOffset) + assert.Equal(t, int64(50), lags[0].AckedOffset) + assert.Equal(t, int64(0), lags[1].Lag) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestConsumerLagNoMessages(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + // Consumer has offset but no messages remain (all acked and deleted) + rows := sqlmock.NewRows([]string{"consumer_group", "topic", "partition_key", "offset_acked", "latest_offset"}). + AddRow("group-1", "orders", "repo-1", 100, 0) + mock.ExpectQuery("SELECT .+ FROM queue_offsets .+ LEFT JOIN"). + WithArgs("orders", "orders"). + WillReturnRows(rows) + + lags, err := store.ConsumerLag(context.Background(), "orders") + require.NoError(t, err) + assert.Len(t, lags, 1) + assert.Equal(t, int64(0), lags[0].Lag) // clamped to 0, not negative + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestStaleLeases(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + rows := sqlmock.NewRows([]string{"consumer_group", "topic", "partition_key", "leased_by", "leased_at", "lease_renewed_at"}). + AddRow("group-1", "orders", "repo-1", "worker-1", 1000, 2000) + mock.ExpectQuery("SELECT .+ FROM queue_partition_leases WHERE lease_renewed_at < \\?"). + WithArgs(sqlmock.AnyArg()). + WillReturnRows(rows) + + leases, err := store.StaleLeases(context.Background(), 60000) + require.NoError(t, err) + assert.Len(t, leases, 1) + assert.Equal(t, "worker-1", leases[0].LeasedBy) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestStaleLeasesEmpty(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + store := NewAdminStore(db) + + rows := sqlmock.NewRows([]string{"consumer_group", "topic", "partition_key", "leased_by", "leased_at", "lease_renewed_at"}) + mock.ExpectQuery("SELECT .+ FROM queue_partition_leases WHERE lease_renewed_at < \\?"). + WithArgs(sqlmock.AnyArg()). + WillReturnRows(rows) + + leases, err := store.StaleLeases(context.Background(), 60000) + require.NoError(t, err) + assert.Empty(t, leases) + assert.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/extension/queue/mysql/ctl/lib/format.go b/extension/queue/mysql/ctl/lib/format.go new file mode 100644 index 00000000..5f734fbc --- /dev/null +++ b/extension/queue/mysql/ctl/lib/format.go @@ -0,0 +1,39 @@ +package lib + +import ( + "encoding/json" + "fmt" + "io" + "strings" + "text/tabwriter" + "time" +) + +// FormatTable writes rows as an aligned text table to w. +// headers defines column names. rows is a slice of slices where each inner +// slice corresponds to one row's cell values (pre-formatted as strings). +func FormatTable(w io.Writer, headers []string, rows [][]string) { + tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0) + fmt.Fprintln(tw, strings.Join(headers, "\t")) + fmt.Fprintln(tw, strings.Repeat("-\t", len(headers))) + for _, row := range rows { + fmt.Fprintln(tw, strings.Join(row, "\t")) + } + tw.Flush() +} + +// FormatJSON marshals v as indented JSON and writes it to w. +func FormatJSON(w io.Writer, v any) error { + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + return enc.Encode(v) +} + +// FormatMillis converts an epoch-millisecond timestamp to a human-readable +// string. Returns "-" for zero values (no timestamp set). +func FormatMillis(ms int64) string { + if ms == 0 { + return "-" + } + return time.UnixMilli(ms).UTC().Format(time.RFC3339) +} diff --git a/extension/queue/mysql/ctl/main.go b/extension/queue/mysql/ctl/main.go new file mode 100644 index 00000000..ddcdb8fe --- /dev/null +++ b/extension/queue/mysql/ctl/main.go @@ -0,0 +1,15 @@ +package main + +import ( + "fmt" + "os" + + _ "github.com/go-sql-driver/mysql" +) + +func main() { + if err := newRootCmd().Execute(); err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } +} diff --git a/extension/queue/mysql/publisher_test.go b/extension/queue/mysql/publisher_test.go index 2d61a23d..477be28b 100644 --- a/extension/queue/mysql/publisher_test.go +++ b/extension/queue/mysql/publisher_test.go @@ -10,6 +10,7 @@ import ( "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity/queue" extqueue "github.com/uber/submitqueue/extension/queue" // mocks in same package @@ -97,14 +98,14 @@ func TestPublisher_Publish(t *testing.T) { }, }, { - name: "publish with invalid topic name - special chars", + name: "publish with valid topic name - hyphens", topic: "topic-with-dash", messages: []queue.Message{ {ID: "msg1", Payload: []byte("p"), PartitionKey: "part1", PublishedAt: fixedTimestamp}, }, - wantErr: true, + wantErr: false, setupMock: func(m *MockmessageStore) { - // No Insert expected since validation fails + m.EXPECT().Insert(gomock.Any(), "topic-with-dash", gomock.Any()).Return(nil).Times(1) }, }, { @@ -219,9 +220,9 @@ func TestValidateTopicName(t *testing.T) { wantErr: true, }, { - name: "invalid topic - dash", + name: "valid topic - with hyphens", topicName: "my-topic", - wantErr: true, + wantErr: false, }, { name: "invalid topic - dot", @@ -266,6 +267,20 @@ func TestValidateTopicName(t *testing.T) { } } +// TestAllConsumerTopicsPassValidation ensures every topic defined in consumer.AllTopics +// passes MySQL topic name validation. This test will catch mismatches automatically +// when new topics are added. +func TestAllConsumerTopicsPassValidation(t *testing.T) { + require.NotEmpty(t, consumer.AllTopics, "AllTopics must not be empty") + + for _, topic := range consumer.AllTopics { + t.Run(string(topic), func(t *testing.T) { + err := validateTopicName(string(topic)) + require.NoError(t, err, "consumer topic %q must pass MySQL topic name validation", topic) + }) + } +} + func TestPublisher_PublishMetrics(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/extension/queue/mysql/validation.go b/extension/queue/mysql/validation.go index d792d711..3c2f71a9 100644 --- a/extension/queue/mysql/validation.go +++ b/extension/queue/mysql/validation.go @@ -2,7 +2,7 @@ package mysql import "fmt" -// validateTopicName ensures topic name is safe for use as SQL table name +// validateTopicName ensures topic name is safe for use as a SQL column value func validateTopicName(topic string) error { if topic == "" { return fmt.Errorf("topic name cannot be empty") @@ -10,10 +10,10 @@ func validateTopicName(topic string) error { if len(topic) > 255 { return fmt.Errorf("topic name too long (max 255 characters)") } - // Only allow lowercase letters, numbers, and underscores + // Only allow lowercase letters, numbers, underscores, and hyphens for _, c := range topic { - if !((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_') { - return fmt.Errorf("topic name must contain only lowercase letters, numbers, and underscores") + if !((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_' || c == '-') { + return fmt.Errorf("topic name must contain only lowercase letters, numbers, underscores, and hyphens") } } return nil diff --git a/go.mod b/go.mod index 9cd397d6..ca09d475 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/go-sql-driver/mysql v1.9.3 github.com/gogo/protobuf v1.3.2 + github.com/spf13/cobra v1.10.2 github.com/stretchr/testify v1.11.1 github.com/uber-go/tally/v4 v4.1.17 go.uber.org/fx v1.22.0 @@ -27,6 +28,7 @@ require ( github.com/golang/mock v1.7.0-rc.1 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/go-cmp v0.7.0 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -35,6 +37,7 @@ require ( github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/spf13/pflag v1.0.9 // indirect github.com/twmb/murmur3 v1.1.8 // indirect github.com/uber-go/tally v3.5.8+incompatible // indirect github.com/uber/tchannel-go v1.34.4 // indirect diff --git a/go.sum b/go.sum index fb915074..fa003055 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,7 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -69,6 +70,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc= github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= @@ -138,11 +141,16 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samuel/go-thrift v0.0.0-20191111193933-5165175b40af h1:EiWVfh8mr40yFZEui2oF0d45KgH48PkB2H0Z0GANvSI= github.com/samuel/go-thrift v0.0.0-20191111193933-5165175b40af/go.mod h1:Vrkh1pnjV9Bl8c3P9zH0/D4NlOHWP5d4/hF4YTULaec= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= +github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= +github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= +github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/streadway/quantile v0.0.0-20220407130108-4246515d968d h1:X4+kt6zM/OVO6gbJdAfJR60MGPsqCzbtXNnjoGqdfAs= github.com/streadway/quantile v0.0.0-20220407130108-4246515d968d/go.mod h1:lbP8tGiBjZ5YWIc2fzuRpTaz0b/53vT6PEs3QuAWzuU= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -193,6 +201,7 @@ go.uber.org/yarpc v1.81.0 h1:Zn4a0EmcbSx59lHH7RcYQpM+NE4TZUNkJjP3ARBVonc= go.uber.org/yarpc v1.81.0/go.mod h1:TaNLpGJjNYVqubGunV4Quw9MlK8vMIR/nHrc9BZhmJ8= go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/test/integration/extension/queue/mysql/BUILD.bazel b/test/integration/extension/queue/mysql/BUILD.bazel index 812d8a65..c02214a6 100644 --- a/test/integration/extension/queue/mysql/BUILD.bazel +++ b/test/integration/extension/queue/mysql/BUILD.bazel @@ -12,6 +12,7 @@ go_test( "//entity/queue", "//extension/queue", "//extension/queue/mysql", + "//extension/queue/mysql/ctl/lib", "//test/testutil", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_stretchr_testify//assert", diff --git a/test/integration/extension/queue/mysql/queue_test.go b/test/integration/extension/queue/mysql/queue_test.go index 2a005820..fc3988e4 100644 --- a/test/integration/extension/queue/mysql/queue_test.go +++ b/test/integration/extension/queue/mysql/queue_test.go @@ -19,6 +19,7 @@ import ( "github.com/uber/submitqueue/entity/queue" extqueue "github.com/uber/submitqueue/extension/queue" queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" + queueAdmin "github.com/uber/submitqueue/extension/queue/mysql/ctl/lib" "github.com/uber/submitqueue/test/testutil" ) @@ -1199,3 +1200,299 @@ drainLoop: t.Logf("Graceful shutdown test successful: %d messages recovered (including in-flight)", len(receivedIDs)) } + +// --- Admin CLI (ctl) integration tests --- +// These tests use the publisher/subscriber to create real state, +// then verify it using AdminStore. + +func (s *SQLQueueIntegrationSuite) TestAdmin_ListTopicsAfterPublish() { + t := s.T() + + topic := "admin_list_topics_test" + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q.Close() + + // Publish messages + publisher := q.Publisher() + require.NoError(t, publisher.Publish(s.ctx, topic, queue.NewMessage("msg-1", []byte("a"), "p1", nil))) + require.NoError(t, publisher.Publish(s.ctx, topic, queue.NewMessage("msg-2", []byte("b"), "p1", nil))) + + // Verify via AdminStore + admin := queueAdmin.NewAdminStore(s.db) + topics, err := admin.ListTopics(s.ctx) + require.NoError(t, err) + + found := false + for _, ti := range topics { + if ti.Topic == topic { + found = true + assert.Equal(t, int64(2), ti.MessageCount) + } + } + assert.True(t, found, "topic %q should appear in list-topics", topic) +} + +func (s *SQLQueueIntegrationSuite) TestAdmin_TopicStatsAfterPublish() { + t := s.T() + + topic := "admin_stats_test" + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q.Close() + + publisher := q.Publisher() + require.NoError(t, publisher.Publish(s.ctx, topic, queue.NewMessage("s1", []byte("x"), "p1", nil))) + require.NoError(t, publisher.Publish(s.ctx, topic, queue.NewMessage("s2", []byte("y"), "p2", nil))) + require.NoError(t, publisher.Publish(s.ctx, topic, queue.NewMessage("s3", []byte("z"), "p2", nil))) + + admin := queueAdmin.NewAdminStore(s.db) + stats, err := admin.GetTopicStats(s.ctx, topic, "_dlq") + require.NoError(t, err) + + assert.Equal(t, int64(3), stats.TotalMessages) + assert.Equal(t, int64(3), stats.VisibleMessages) + assert.Equal(t, int64(0), stats.InvisibleMessages) + assert.Equal(t, int64(2), stats.PartitionCount) // p1, p2 + assert.Equal(t, int64(0), stats.DLQCount) + + t.Logf("Topic stats verified: total=%d visible=%d partitions=%d", stats.TotalMessages, stats.VisibleMessages, stats.PartitionCount) +} + +func (s *SQLQueueIntegrationSuite) TestAdmin_InspectMessage() { + t := s.T() + + topic := "admin_inspect_test" + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q.Close() + + metadata := map[string]string{"env": "test", "trace": "abc"} + publisher := q.Publisher() + require.NoError(t, publisher.Publish(s.ctx, topic, queue.NewMessage("inspect-1", []byte("payload-data"), "p1", metadata))) + + admin := queueAdmin.NewAdminStore(s.db) + detail, found, err := admin.InspectMessage(s.ctx, topic, "inspect-1") + require.NoError(t, err) + assert.True(t, found) + assert.Equal(t, "inspect-1", detail.ID) + assert.Equal(t, topic, detail.Topic) + assert.Equal(t, "p1", detail.PartitionKey) + assert.Equal(t, []byte("payload-data"), detail.Payload) + assert.Equal(t, "test", detail.Metadata["env"]) + assert.Equal(t, "abc", detail.Metadata["trace"]) + + t.Logf("Inspect message verified: id=%s payload=%s metadata=%v", detail.ID, string(detail.Payload), detail.Metadata) +} + +func (s *SQLQueueIntegrationSuite) TestAdmin_DeleteAndPurge() { + t := s.T() + + topic := "admin_delete_test" + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q.Close() + + publisher := q.Publisher() + require.NoError(t, publisher.Publish(s.ctx, topic, queue.NewMessage("del-1", []byte("a"), "p1", nil))) + require.NoError(t, publisher.Publish(s.ctx, topic, queue.NewMessage("del-2", []byte("b"), "p1", nil))) + require.NoError(t, publisher.Publish(s.ctx, topic, queue.NewMessage("del-3", []byte("c"), "p1", nil))) + + admin := queueAdmin.NewAdminStore(s.db) + + // Delete single message + affected, err := admin.DeleteMessage(s.ctx, topic, "del-1") + require.NoError(t, err) + assert.Equal(t, int64(1), affected) + + // Verify it's gone + _, found, err := admin.InspectMessage(s.ctx, topic, "del-1") + require.NoError(t, err) + assert.False(t, found) + + // Purge remaining + affected, err = admin.PurgeTopic(s.ctx, topic) + require.NoError(t, err) + assert.Equal(t, int64(2), affected) + + // Verify topic is empty + msgs, err := admin.ListMessages(s.ctx, topic, "", 50) + require.NoError(t, err) + assert.Empty(t, msgs) +} + +func (s *SQLQueueIntegrationSuite) TestAdmin_ConsumerLagAfterPartialAck() { + t := s.T() + + topic := "admin_lag_test" + consumerGroup := "lag-consumer" + + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q.Close() + + publisher := q.Publisher() + subscriber := q.Subscriber() + + // Publish 5 messages to same partition + for i := 0; i < 5; i++ { + msg := queue.NewMessage(fmt.Sprintf("lag-%d", i), []byte("data"), "lag-partition", nil) + require.NoError(t, publisher.Publish(s.ctx, topic, msg)) + } + + // Subscribe and ack only 2 + subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", consumerGroup) + subConfig.PollIntervalMs = 100 + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) + require.NoError(t, err) + + receiveNWithTimeout(t, deliveryChan, 2, 5*time.Second, func(delivery extqueue.Delivery, index int) { + require.NoError(t, delivery.Ack(s.ctx)) + }) + + // Check consumer lag — should show lag > 0 + admin := queueAdmin.NewAdminStore(s.db) + lags, err := admin.ConsumerLag(s.ctx, topic) + require.NoError(t, err) + require.NotEmpty(t, lags) + + var found bool + for _, lag := range lags { + if lag.ConsumerGroup == consumerGroup && lag.PartitionKey == "lag-partition" { + found = true + assert.Greater(t, lag.LatestOffset, lag.AckedOffset, "latest should be ahead of acked") + assert.Greater(t, lag.Lag, int64(0), "lag should be positive with unacked messages") + t.Logf("Consumer lag verified: acked=%d latest=%d lag=%d", lag.AckedOffset, lag.LatestOffset, lag.Lag) + } + } + assert.True(t, found, "should find lag entry for consumer group %q", consumerGroup) +} + +func (s *SQLQueueIntegrationSuite) TestAdmin_LeasesAndOffsets() { + t := s.T() + + topic := "admin_leases_test" + consumerGroup := "lease-consumer" + + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q.Close() + + publisher := q.Publisher() + subscriber := q.Subscriber() + + // Publish and subscribe to create leases and offsets + require.NoError(t, publisher.Publish(s.ctx, topic, queue.NewMessage("lo-1", []byte("a"), "p1", nil))) + + subConfig := extqueue.DefaultSubscriptionConfig(topic, "admin-worker-1", consumerGroup) + subConfig.PollIntervalMs = 100 + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) + require.NoError(t, err) + + // Ack the message to create offset entries + delivery := receiveWithTimeout(t, deliveryChan, 5*time.Second) + require.NoError(t, delivery.Ack(s.ctx)) + + admin := queueAdmin.NewAdminStore(s.db) + + // Verify leases are visible + leases, err := admin.ListLeases(s.ctx) + require.NoError(t, err) + + var leaseFound bool + for _, l := range leases { + if l.ConsumerGroup == consumerGroup && l.Topic == topic { + leaseFound = true + assert.Equal(t, "admin-worker-1", l.LeasedBy) + assert.Greater(t, l.LeasedAt, int64(0)) + assert.Greater(t, l.LeaseRenewedAt, int64(0)) + t.Logf("Lease verified: group=%s topic=%s partition=%s leased_by=%s", l.ConsumerGroup, l.Topic, l.PartitionKey, l.LeasedBy) + } + } + assert.True(t, leaseFound, "should find lease for consumer group %q", consumerGroup) + + // Verify offsets are visible + offsets, err := admin.ListOffsets(s.ctx, consumerGroup) + require.NoError(t, err) + + var offsetFound bool + for _, o := range offsets { + if o.Topic == topic { + offsetFound = true + assert.Greater(t, o.OffsetAcked, int64(0), "offset should be > 0 after ack") + t.Logf("Offset verified: group=%s topic=%s partition=%s acked=%d", o.ConsumerGroup, o.Topic, o.PartitionKey, o.OffsetAcked) + } + } + assert.True(t, offsetFound, "should find offset for consumer group %q", consumerGroup) +} + +func (s *SQLQueueIntegrationSuite) TestAdmin_ResetOffsetAndReleaseLease() { + t := s.T() + + topic := "admin_reset_test" + consumerGroup := "reset-consumer" + + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q.Close() + + publisher := q.Publisher() + subscriber := q.Subscriber() + + // Publish, subscribe, ack — creates offsets and leases + require.NoError(t, publisher.Publish(s.ctx, topic, queue.NewMessage("r1", []byte("a"), "rp1", nil))) + + subConfig := extqueue.DefaultSubscriptionConfig(topic, "reset-worker", consumerGroup) + subConfig.PollIntervalMs = 100 + deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) + require.NoError(t, err) + + delivery := receiveWithTimeout(t, deliveryChan, 5*time.Second) + require.NoError(t, delivery.Ack(s.ctx)) + + admin := queueAdmin.NewAdminStore(s.db) + + // Reset offset to 0 + affected, err := admin.ResetOffset(s.ctx, consumerGroup, topic, "rp1", 0) + require.NoError(t, err) + assert.Equal(t, int64(1), affected) + + // Verify offset was reset + offsets, err := admin.ListOffsets(s.ctx, consumerGroup) + require.NoError(t, err) + for _, o := range offsets { + if o.Topic == topic && o.PartitionKey == "rp1" { + assert.Equal(t, int64(0), o.OffsetAcked, "offset should be reset to 0") + } + } + + // Release the lease + affected, err = admin.ReleaseLease(s.ctx, consumerGroup, topic, "rp1") + require.NoError(t, err) + assert.Equal(t, int64(1), affected) + + // Verify lease is gone + leases, err := admin.ListLeases(s.ctx) + require.NoError(t, err) + for _, l := range leases { + if l.ConsumerGroup == consumerGroup && l.Topic == topic && l.PartitionKey == "rp1" { + t.Errorf("lease should have been released but still exists") + } + } + + t.Logf("Reset offset and release lease verified") +}