diff --git a/doc/rfc/sql-queue-rfc.md b/doc/rfc/sql-queue-rfc.md index 18ba1905..64f5d846 100644 --- a/doc/rfc/sql-queue-rfc.md +++ b/doc/rfc/sql-queue-rfc.md @@ -115,6 +115,12 @@ We chose **custom database-backed queue** because: │ │ - consumer_group, topic │ │ │ │ - partition_key, offset │ │ │ └───────────────────────────────┘ │ +│ ┌───────────────────────────────┐ │ +│ │ queue_subscriber_heartbeats │ │ +│ │ - consumer_group, topic │ │ +│ │ - subscriber_name │ │ +│ │ - heartbeat_at │ │ +│ └───────────────────────────────┘ │ └─────────────────────────────────────┘ │ ▼ @@ -141,6 +147,8 @@ We chose **custom database-backed queue** because: **Offset Tracking:** Per-partition offsets enable crash recovery from last acked message. +**Fair Share Partitioning:** Subscribers register heartbeats so peers can compute even partition distribution. On each tick, subscribers compute `ceil(totalPartitions / activeSubscribers)` and release excess partitions if they hold more than their fair share. + ## Database Schema ### Messages Table @@ -186,21 +194,30 @@ See `extension/queue/mysql/schema/queue_partition_leases.sql` for full schema. See `extension/queue/mysql/schema/queue_offsets.sql` for full schema. -### Dead Letter Queue Table +### Subscriber Heartbeats Table **Key Fields:** -- `offset` (PK): Auto-incrementing offset for DLQ ordering -- `topic`, `partition_key`, `id`: Original message identification -- `payload`, `metadata`: Original message content -- `failed_at`: When message moved to DLQ -- `failure_count`, `last_error`: Failure diagnostics +- `consumer_group`, `topic`, `subscriber_name` (PK): Identifies the subscriber +- `heartbeat_at`: Last heartbeat timestamp (epoch ms) +- `deregistered_at`: When the subscriber was deregistered (0 = active) -**Indexes:** -- `(topic, partition_key, failed_at)`: Query DLQ by topic/partition, ordered by failure time -- `(failed_at)`: Time-based queries and cleanup -- `(topic, partition_key, id)`: Unique constraint, prevents duplicates +Used for fair share partition leasing — active subscribers (recent heartbeat) are counted to compute even partition distribution. + +See `extension/queue/mysql/schema/queue_subscriber_heartbeats.sql` for full schema. -See `extension/queue/mysql/schema/queue_dlq.sql` for full schema. +### Dead Letter Queue + +DLQ messages are reinserted into the `queue_messages` table with a DLQ topic suffix (e.g., `merge_events_dlq`). This avoids a separate table and allows DLQ messages to be consumed using the normal subscriber. + +**DLQ-specific fields on `queue_messages`:** +- `failed_at`: When the message was moved to DLQ (epoch ms) +- `failure_count`: How many times the message failed on the original topic before DLQ move +- `last_error`: The error message that triggered the DLQ move +- `original_topic`: The topic the message was originally published to + +Normal messages have these fields set to zero/empty. DLQ messages have `retry_count` reset to 0 since DLQ processing starts fresh. + +See `extension/queue/mysql/schema/queue_messages.sql` for full schema. ## Message Flow @@ -214,7 +231,7 @@ See `extension/queue/mysql/schema/queue_dlq.sql` for full schema. **5. Nack** - UPDATE `invisible_until` for retry after delay -**6. DLQ** - If `retry_count >= MaxAttempts`: DELETE from messages + INSERT into dlq +**6. DLQ** - If `retry_count >= MaxAttempts`: DELETE from messages + INSERT into messages with DLQ topic suffix ## Crash Recovery @@ -304,15 +321,18 @@ With our custom implementation: ## Observability -**Metrics (via tally):** -- Publisher: `messages_published`, `publish_errors` -- Subscriber: `messages_acked`, `messages_nacked`, `messages_moved_to_dlq`, `message_age`, `leases_acquired` -- Stores: `insert.latency`, `fetch.latency`, `ack_message.latency`, `renew_lease.latency` - -**Logging (via zap):** -- Debug: Message fetch, lease operations -- Info: Publish success, DLQ moves, partition acquisition -- Error: Database errors, unrecoverable failures +**Metrics (via tally, scoped with `queue_mysql_` prefix):** +- Publisher: `publish` (latency, success/error via `metrics.Begin`) +- Subscriber: `ack.messages_acked`, `nack.messages_nacked`, `reject.messages_rejected_to_dlq`, `poll_and_deliver.message_age`, `poll_and_deliver.messages_received`, `discover_and_reconcile.leases_acquired` +- Lease store: `try_acquire_lease`, `renew_lease`, `release_lease`, `get_leased_partitions`, `discover_and_acquire` (all with latency via `metrics.Begin`) +- Message store: `insert`, `fetch`, `delete`, `move_to_dlq`, `set_visibility` (all with latency via `metrics.Begin`) +- Heartbeat: `heartbeat.sent`, `heartbeat.errors`, `heartbeat.deregistrations` +- Fair share: `fair_share_cap.active_subscribers`, `rebalance.partitions_released` + +**Logging (via zap, named with `queue_mysql_` prefix):** +- Debug: Message fetch, lease operations, partition worker lifecycle +- Info: Subscription creation, DLQ moves, partition acquisition, rebalance events +- Warn: Lease renewal failures, heartbeat failures, retry limit exceeded, offset errors - Structured fields: `topic`, `partition_key`, `message_id`, `offset`, `retry_count` ## Performance diff --git a/extension/queue/mysql/README.md b/extension/queue/mysql/README.md index e5dd22f8..37f9b639 100644 --- a/extension/queue/mysql/README.md +++ b/extension/queue/mysql/README.md @@ -8,7 +8,8 @@ MySQL-based distributed queue with partition leasing, visibility timeout, and at - **Per-partition workers** — each leased partition gets its own goroutine for isolation - **Visibility timeout** — messages retry automatically if worker crashes - **At-least-once delivery** — offset tracking for crash recovery -- **Dead letter queue** — failed messages moved to DLQ after max retries +- **Dead letter queue** — failed messages moved to DLQ topic after max retries +- **Fair share partitioning** — subscriber heartbeats enable even partition distribution across workers ## Quick Start @@ -68,9 +69,6 @@ subConfig.Retry.MaxBackoffMs = 30000 // Max retry backoff (mill subConfig.Retry.BackoffMultiplier = 2.0 // Backoff multiplier for exponential backoff subConfig.DLQ.Enabled = true // Enable dead letter queue subConfig.DLQ.TopicSuffix = "_dlq" // DLQ topic suffix - -// Use config when subscribing -deliveryCh, _ := q.Subscriber().Subscribe(ctx, "my-topic", subConfig) ``` **Key Configuration Fields:** @@ -95,7 +93,9 @@ deliveryCh, _ := q.Subscriber().Subscribe(ctx, "my-topic", subConfig) Each subscription has a **supervisor goroutine** (`managePartitions`) that: 1. Discovers partitions from the messages table 2. Acquires and renews partition leases -3. Reconciles **per-partition worker goroutines** based on current leases +3. Sends heartbeats so other subscribers can compute fair shares +4. Rebalances partitions when subscribers join or leave +5. Reconciles **per-partition worker goroutines** based on current leases Each partition worker goroutine polls and delivers messages for its partition independently. This provides fault isolation — a slow or blocked partition does not affect other partitions. @@ -107,6 +107,16 @@ Subscribe() └── partitionWorker("part-3") ← polls & delivers ``` +### Fair Share Partitioning + +Subscribers register heartbeats in the `queue_subscriber_heartbeats` table. On each lease renewal tick, the supervisor: +1. Queries active subscribers (those with recent heartbeats) +2. Computes `ceil(totalPartitions / activeSubscribers)` as the fair share cap +3. If this subscriber owns more partitions than its fair share, releases excess partitions +4. On discovery, limits new lease acquisitions to the fair share cap + +This ensures even partition distribution when subscribers scale up or down. + ### Shutdown Sequence Shutdown uses two `sync.WaitGroup`s to ensure correctness: @@ -115,20 +125,21 @@ Shutdown uses two `sync.WaitGroup`s to ensure correctness: When `Close()` is called: 1. Subscription context is cancelled -2. `managePartitions` calls `stopAllWorkers` — cancels each worker and waits up to 5s per worker +2. `managePartitions` calls `stopAllWorkers` — cancels each worker and waits up to 30s per worker 3. Partition leases are released -4. `workerWg.Wait()` blocks until all workers have fully exited -5. `deliveryCh` is closed — safe because no workers can send after step 4 -6. `managePartitions` returns, `wg.Done()` fires -7. `Close()` returns +4. Subscriber heartbeat is deregistered +5. `workerWg.Wait()` blocks until all workers have fully exited +6. `deliveryCh` is closed — safe because no workers can send after step 5 +7. `managePartitions` returns, `wg.Done()` fires +8. `Close()` returns -The `workerWg.Wait()` step prevents a race where a slow worker (blocked on I/O past the 5s timeout) could send on a closed channel. +The `workerWg.Wait()` step prevents a race where a slow worker (blocked on I/O past the 30s timeout) could send on a closed channel. ### Worker Stop Behavior When a partition worker is stopped (lease lost or shutdown): - The worker is immediately removed from the workers map and its context is cancelled -- The caller waits up to 5s for the worker to confirm exit (logging a warning on timeout) +- The caller waits up to 30s for the worker to confirm exit (logging a warning on timeout) - `workerWg` tracks the worker regardless, so `Close()` always waits for full exit - If the worker times out, reconciliation is free to start a replacement — any brief overlap is harmless with at-least-once delivery semantics @@ -144,7 +155,7 @@ When a partition worker is stopped (lease lost or shutdown): 2. Process message 3. Ack: DELETE message, UPDATE offset 4. Nack: Message becomes visible after timeout -5. If retry_count >= MaxAttempts: Move to DLQ +5. If retry_count >= MaxAttempts: Move to DLQ topic (reinsert into messages table with DLQ topic suffix) **Crash Recovery:** - Messages become visible after visibility timeout @@ -158,3 +169,12 @@ Messages with same `PartitionKey` are processed in order by a single worker. ## Distributed Processing Multiple workers in the same consumer group share partitions. Workers in different consumer groups consume independently. + +## Error Types + +The package defines domain-specific sentinel errors for callers to check with `errors.Is` / `errors.As`: + +- `ErrPublisherClosed` — publishing to a closed publisher +- `ErrSubscriberClosed` — subscribing on a closed subscriber +- `ErrAlreadyAcknowledged` — ack/nack/reject on an already-processed delivery +- `ErrLeaseExpired` — lease operation on a partition no longer owned by this worker diff --git a/extension/queue/mysql/constants.go b/extension/queue/mysql/constants.go index 55f4dcb4..1d5c7fc9 100644 --- a/extension/queue/mysql/constants.go +++ b/extension/queue/mysql/constants.go @@ -17,16 +17,9 @@ package mysql // Common constants for frequently repeated strings across stores const ( - // Tag key (used in every Tagged() call) - tagErrorType = "error_type" - // Common log field names (used extensively across all stores) logTopic = "topic" logPartitionKey = "partition_key" logMessageID = "message_id" logError = "error" - - // Error types used across multiple methods/stores - errorBeginTx = "begin_transaction" - errorCommit = "commit" ) diff --git a/extension/queue/mysql/errors.go b/extension/queue/mysql/errors.go index 5a79a4fc..9bb0236c 100644 --- a/extension/queue/mysql/errors.go +++ b/extension/queue/mysql/errors.go @@ -14,9 +14,21 @@ package mysql -import "fmt" +import ( + "errors" + "fmt" +) -// ErrAlreadyAcknowledged is returned when attempting to ack/nack a delivery that was already processed +// Sentinel errors for closed resources. Callers can check these with errors.Is. +var ( + // ErrPublisherClosed is returned when publishing to a closed publisher. + ErrPublisherClosed = errors.New("publisher is closed") + + // ErrSubscriberClosed is returned when subscribing on a closed subscriber. + ErrSubscriberClosed = errors.New("subscriber is closed") +) + +// ErrAlreadyAcknowledged is returned when attempting to ack/nack a delivery that was already processed. type ErrAlreadyAcknowledged struct { DeliveryID string } @@ -24,3 +36,14 @@ type ErrAlreadyAcknowledged struct { func (e *ErrAlreadyAcknowledged) Error() string { return fmt.Sprintf("delivery %s already acknowledged or nacked", e.DeliveryID) } + +// ErrLeaseExpired is returned when a lease operation fails because the lease +// is not owned by this worker or has already expired. +type ErrLeaseExpired struct { + Topic string + PartitionKey string +} + +func (e *ErrLeaseExpired) Error() string { + return fmt.Sprintf("lease expired or not owned for topic %s partition %s", e.Topic, e.PartitionKey) +} diff --git a/extension/queue/mysql/message_store.go b/extension/queue/mysql/message_store.go index 8e5374fb..47e218d8 100644 --- a/extension/queue/mysql/message_store.go +++ b/extension/queue/mysql/message_store.go @@ -22,6 +22,7 @@ import ( "time" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "go.uber.org/zap" "github.com/uber/submitqueue/entity/queue" @@ -31,54 +32,30 @@ import ( type sqlmessageStore struct { db *sql.DB logger *zap.SugaredLogger - metrics tally.Scope + scope tally.Scope } -// Metric names for message store -const ( - metricInsertErrors = "insert.errors" - metricFetchErrors = "fetch.errors" - metricMoveToDLQErrors = "move_to_dlq.errors" -) - // newMessageStore creates a new SQL message store -func newMessageStore(db *sql.DB, logger *zap.Logger, metrics tally.Scope) messageStore { +func newMessageStore(db *sql.DB, logger *zap.Logger, scope tally.Scope) messageStore { return &sqlmessageStore{ db: db, - logger: logger.Sugar().Named("message_store"), - metrics: metrics.SubScope("message_store"), + logger: logger.Sugar().Named("queue_mysql_message_store"), + scope: scope.SubScope("queue_mysql_message_store"), } } // Insert inserts messages into the messages table -func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []queue.Message) error { - start := time.Now() - success := false - defer func() { - result := "error" - if success { - result = "success" - } - s.metrics.Tagged(map[string]string{"result": result}).Timer("insert.latency").Record(time.Since(start)) - }() +func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []queue.Message) (retErr error) { + op := metrics.Begin(s.scope, "insert") + defer func() { op.Complete(retErr) }() if len(messages) == 0 { return nil } - s.logger.Debugw("inserting messages", - logTopic, topic, - "count", len(messages), - ) - tx, err := s.db.BeginTx(ctx, nil) if err != nil { - s.logger.Errorw("failed to begin transaction", - logTopic, topic, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "begin_transaction"}).Counter(metricInsertErrors).Inc(1) - return fmt.Errorf("failed to begin transaction: %w", err) + return fmt.Errorf("failed to begin transaction for topic %s: %w", topic, err) } defer tx.Rollback() @@ -87,28 +64,17 @@ func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []q VALUES (?, ?, ?, ?, ?, ?, ?, 0, 0, 0, 0, '', '') `, MessagesTableName)) if err != nil { - s.logger.Errorw("failed to prepare statement", - logTopic, topic, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "prepare_statement"}).Counter(metricInsertErrors).Inc(1) - return fmt.Errorf("failed to prepare statement: %w", err) + return fmt.Errorf("failed to prepare statement for topic %s: %w", topic, err) } defer stmt.Close() - now := start.UnixMilli() + now := time.Now().UnixMilli() for _, msg := range messages { var metadataJSON []byte if len(msg.Metadata) > 0 { metadataJSON, err = json.Marshal(msg.Metadata) if err != nil { - s.logger.Errorw("failed to marshal metadata", - logTopic, topic, - logMessageID, msg.ID, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "marshal_metadata"}).Counter(metricInsertErrors).Inc(1) - return fmt.Errorf("failed to marshal metadata: %w", err) + return fmt.Errorf("failed to marshal metadata for message ID %s in topic %s: %w", msg.ID, topic, err) } } @@ -122,100 +88,52 @@ func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []q msg.PublishedAt, ) if err != nil { - s.logger.Errorw("failed to insert message", - logTopic, topic, - logMessageID, msg.ID, - logPartitionKey, msg.PartitionKey, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "exec_statement"}).Counter(metricInsertErrors).Inc(1) - return fmt.Errorf("failed to insert message: %w", err) + return fmt.Errorf("failed to insert message ID %s with partition key %s in topic %s: %w", msg.ID, msg.PartitionKey, topic, err) } } if err := tx.Commit(); err != nil { - s.logger.Errorw("failed to commit transaction", - logTopic, topic, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "commit"}).Counter(metricInsertErrors).Inc(1) - return fmt.Errorf("failed to commit transaction: %w", err) + return fmt.Errorf("failed to commit transaction for message ID %s with partition key %s in topic %s: %w", messages[0].ID, messages[0].PartitionKey, topic, err) } - s.metrics.Counter("insert.success").Inc(1) - s.metrics.Counter("messages.inserted").Inc(int64(len(messages))) - s.logger.Debugw("inserted messages", - logTopic, topic, - "count", len(messages), - "duration_ms", time.Since(start).Milliseconds(), - ) - - success = true + s.logger.Debugw("inserted messages", "count", len(messages), logTopic, topic) return nil } -// Delete deletes a message by topic and ID -func (s *sqlmessageStore) Delete(ctx context.Context, topic string, messageID string) error { - start := time.Now() - success := false - defer func() { - result := "error" - if success { - result = "success" - } - s.metrics.Tagged(map[string]string{"result": result}).Timer("delete.latency").Record(time.Since(start)) - }() +// Delete deletes a message by topic, partition key, and ID +func (s *sqlmessageStore) Delete(ctx context.Context, topic string, partitionKey string, messageID string) (retErr error) { + op := metrics.Begin(s.scope, "delete") + defer func() { op.Complete(retErr) }() result, err := s.db.ExecContext(ctx, fmt.Sprintf(` - DELETE FROM %s WHERE topic = ? AND id = ? - `, MessagesTableName), topic, messageID) + DELETE FROM %s WHERE topic = ? AND partition_key = ? AND id = ? + `, MessagesTableName), topic, partitionKey, messageID) if err != nil { - s.logger.Errorw("failed to delete message", - logTopic, topic, - logMessageID, messageID, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "exec_delete"}).Counter("delete.errors").Inc(1) - return err + return fmt.Errorf("failed to delete message %s in topic %s partition %s: %w", messageID, topic, partitionKey, err) } rows, _ := result.RowsAffected() - s.metrics.Counter("delete.success").Inc(1) if rows > 0 { - s.metrics.Counter("messages.deleted").Inc(rows) + metrics.NamedCounter(s.scope, "delete", "messages_deleted", rows) } - success = true return nil } // FetchByOffset fetches visible messages with offset > currentOffset for a specific partition // Atomically sets invisible_until and increments retry_count for fetched messages -func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, limit int, visibilityTimeoutMs int64) ([]messageRow, error) { - start := time.Now() - success := false - defer func() { - result := "error" - if success { - result = "success" - } - s.metrics.Tagged(map[string]string{"result": result}).Timer("fetch.latency").Record(time.Since(start)) - }() +func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, limit int, visibilityTimeoutMs int64) (_ []messageRow, retErr error) { + op := metrics.Begin(s.scope, "fetch") + defer func() { op.Complete(retErr) }() - now := start.UnixMilli() + now := time.Now().UnixMilli() invisibleUntil := now + visibilityTimeoutMs // Start transaction to atomically fetch and update messages tx, err := s.db.BeginTx(ctx, nil) if err != nil { - s.logger.Errorw("failed to begin transaction for fetch", - logTopic, topic, - logPartitionKey, partitionKey, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "begin_transaction"}).Counter(metricFetchErrors).Inc(1) - return nil, fmt.Errorf("failed to begin transaction: %w", err) + return nil, fmt.Errorf("failed to begin fetch transaction for topic %s partition %s: %w", topic, partitionKey, err) } defer tx.Rollback() @@ -228,13 +146,7 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti LIMIT ? `, MessagesTableName), topic, partitionKey, currentOffset, now, limit) if err != nil { - s.logger.Errorw("failed to query messages", - logTopic, topic, - logPartitionKey, partitionKey, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "query"}).Counter(metricFetchErrors).Inc(1) - return nil, fmt.Errorf("failed to query messages: %w", err) + return nil, fmt.Errorf("failed to query messages for topic %s partition %s: %w", topic, partitionKey, err) } defer rows.Close() @@ -257,26 +169,13 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti ) if err := rows.Scan(&offset, &id, &payload, &metadataJSON, &partKey, &retryCount, &publishedAtMilli, &failedAt, &failureCount, &lastError, &originalTopic); err != nil { - s.logger.Errorw("failed to scan message row", - logTopic, topic, - logPartitionKey, partitionKey, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "scan_row"}).Counter(metricFetchErrors).Inc(1) - return nil, fmt.Errorf("failed to scan row: %w", err) + return nil, fmt.Errorf("failed to scan message row for topic %s partition %s: %w", topic, partitionKey, err) } var metadata map[string]string if len(metadataJSON) > 0 { if err := json.Unmarshal(metadataJSON, &metadata); err != nil { - s.logger.Errorw("failed to unmarshal metadata", - logTopic, topic, - logPartitionKey, partitionKey, - logMessageID, id, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "unmarshal_metadata"}).Counter(metricFetchErrors).Inc(1) - return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) + return nil, fmt.Errorf("failed to unmarshal metadata for message %s in topic %s partition %s: %w", id, topic, partitionKey, err) } } if metadata == nil { @@ -301,13 +200,7 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti } if err := rows.Err(); err != nil { - s.logger.Errorw("row iteration error", - logTopic, topic, - logPartitionKey, partitionKey, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "row_iteration"}).Counter(metricFetchErrors).Inc(1) - return nil, fmt.Errorf("row iteration error: %w", err) + return nil, fmt.Errorf("row iteration error for topic %s partition %s: %w", topic, partitionKey, err) } // Update invisible_until and increment retry_count for fetched messages @@ -334,66 +227,37 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti _, err = tx.ExecContext(ctx, query, args...) if err != nil { - s.logger.Errorw("failed to update message visibility", - logTopic, topic, - logPartitionKey, partitionKey, - "message_count", len(messageIDs), - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "update_visibility"}).Counter(metricFetchErrors).Inc(1) - return nil, fmt.Errorf("failed to update messages: %w", err) + return nil, fmt.Errorf("failed to update visibility for %d messages in topic %s partition %s: %w", len(messageIDs), topic, partitionKey, err) } } if err := tx.Commit(); err != nil { - s.logger.Errorw("failed to commit fetch transaction", - logTopic, topic, - logPartitionKey, partitionKey, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "commit"}).Counter(metricFetchErrors).Inc(1) - return nil, fmt.Errorf("failed to commit transaction: %w", err) + return nil, fmt.Errorf("failed to commit fetch transaction for topic %s partition %s: %w", topic, partitionKey, err) } - s.metrics.Counter("fetch.success").Inc(1) - s.metrics.Counter("messages.fetched").Inc(int64(len(results))) + metrics.NamedCounter(s.scope, "fetch", "messages_fetched", int64(len(results))) s.logger.Debugw("fetched messages", logTopic, topic, logPartitionKey, partitionKey, "count", len(results), - "duration_ms", time.Since(start).Milliseconds(), ) - success = true return results, nil } // MoveToDLQ atomically moves a message to the DLQ by reinserting it with the DLQ topic name // The message is inserted back into queue_messages table with the DLQ topic (original + suffix) // This allows DLQ messages to be consumed using the normal subscriber -func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID string, failureCount int, lastError string, dlqTopicSuffix string) error { - start := time.Now() - success := false - defer func() { - result := "error" - if success { - result = "success" - } - s.metrics.Tagged(map[string]string{"result": result}).Timer("move_to_dlq.latency").Record(time.Since(start)) - }() +func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, partitionKey string, messageID string, failureCount int, lastError string, dlqTopicSuffix string) (retErr error) { + op := metrics.Begin(s.scope, "move_to_dlq") + defer func() { op.Complete(retErr) }() // Construct DLQ topic name dlqTopic := topic + dlqTopicSuffix tx, err := s.db.BeginTx(ctx, nil) if err != nil { - s.logger.Errorw("failed to begin transaction for DLQ move", - logTopic, topic, - logMessageID, messageID, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "begin_transaction"}).Counter(metricMoveToDLQErrors).Inc(1) - return fmt.Errorf("failed to begin transaction: %w", err) + return fmt.Errorf("failed to begin DLQ transaction for message %s in topic %s: %w", messageID, topic, err) } defer tx.Rollback() @@ -401,85 +265,55 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID var ( payload []byte metadataJSON []byte - partitionKey string createdAtMilli int64 publishedAtMilli int64 retryCount int ) err = tx.QueryRowContext(ctx, fmt.Sprintf(` - SELECT payload, metadata, partition_key, created_at, published_at, retry_count + SELECT payload, metadata, created_at, published_at, retry_count FROM %s - WHERE topic = ? AND id = ? - `, MessagesTableName), topic, messageID).Scan(&payload, &metadataJSON, &partitionKey, &createdAtMilli, &publishedAtMilli, &retryCount) + WHERE topic = ? AND partition_key = ? AND id = ? + `, MessagesTableName), topic, partitionKey, messageID).Scan(&payload, &metadataJSON, &createdAtMilli, &publishedAtMilli, &retryCount) if err != nil { if err == sql.ErrNoRows { // Message already deleted or doesn't exist - s.logger.Debugw("message not found for DLQ move", + s.logger.Warnw("message not found for DLQ move", logTopic, topic, logMessageID, messageID, ) return nil } - s.logger.Errorw("failed to fetch message for DLQ", - logTopic, topic, - logMessageID, messageID, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "fetch_message"}).Counter(metricMoveToDLQErrors).Inc(1) - return fmt.Errorf("failed to fetch message: %w", err) + return fmt.Errorf("failed to fetch message %s for DLQ in topic %s: %w", messageID, topic, err) } // Insert into queue_messages table with DLQ topic name and DLQ-specific fields // Reset retry_count to 0 since this is a new topic (DLQ processing starts fresh) // Store the original failure count for tracking purposes - now := start.UnixMilli() + now := time.Now().UnixMilli() _, err = tx.ExecContext(ctx, fmt.Sprintf(` INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, invisible_until, retry_count, failed_at, failure_count, last_error, original_topic) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, MessagesTableName), dlqTopic, messageID, payload, metadataJSON, partitionKey, createdAtMilli, publishedAtMilli, int64(0), 0, now, failureCount, lastError, topic) if err != nil { - s.logger.Errorw("failed to insert into DLQ topic", - logTopic, topic, - "dlq_topic", dlqTopic, - logMessageID, messageID, - logPartitionKey, partitionKey, - "failure_count", failureCount, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "insert_dlq"}).Counter(metricMoveToDLQErrors).Inc(1) - return fmt.Errorf("failed to insert into DLQ: %w", err) + return fmt.Errorf("failed to insert message %s into DLQ topic %s (partition %s, failure_count %d): %w", messageID, dlqTopic, partitionKey, failureCount, err) } // Delete from original topic _, err = tx.ExecContext(ctx, fmt.Sprintf(` - DELETE FROM %s WHERE topic = ? AND id = ? - `, MessagesTableName), topic, messageID) + DELETE FROM %s WHERE topic = ? AND partition_key = ? AND id = ? + `, MessagesTableName), topic, partitionKey, messageID) if err != nil { - s.logger.Errorw("failed to delete from main table after DLQ insert", - logTopic, topic, - logMessageID, messageID, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "delete_from_main"}).Counter(metricMoveToDLQErrors).Inc(1) - return fmt.Errorf("failed to delete from main table: %w", err) + return fmt.Errorf("failed to delete message %s from topic %s after DLQ insert: %w", messageID, topic, err) } if err := tx.Commit(); err != nil { - s.logger.Errorw("failed to commit DLQ transaction", - logTopic, topic, - logMessageID, messageID, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "commit"}).Counter(metricMoveToDLQErrors).Inc(1) - return fmt.Errorf("failed to commit transaction: %w", err) + return fmt.Errorf("failed to commit DLQ transaction for message %s in topic %s: %w", messageID, topic, err) } - s.metrics.Counter("move_to_dlq.success").Inc(1) - s.metrics.Counter("messages.moved_to_dlq").Inc(1) s.logger.Infow("moved message to DLQ", logTopic, topic, "dlq_topic", dlqTopic, @@ -487,10 +321,8 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID logPartitionKey, partitionKey, "failure_count", failureCount, "last_error", lastError, - "duration_ms", time.Since(start).Milliseconds(), ) - success = true return nil } @@ -498,20 +330,13 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID // visibilityTimeoutMillis: milliseconds from now to hide the message // If visibilityTimeoutMillis is 0, makes the message visible immediately // If visibilityTimeoutMillis > 0, makes the message invisible until now + visibilityTimeoutMillis -func (s *sqlmessageStore) SetVisibilityTimeout(ctx context.Context, topic string, messageID string, visibilityTimeoutMillis int64) error { - start := time.Now() - success := false - defer func() { - result := "error" - if success { - result = "success" - } - s.metrics.Tagged(map[string]string{"result": result}).Timer("set_visibility.latency").Record(time.Since(start)) - }() +func (s *sqlmessageStore) SetVisibilityTimeout(ctx context.Context, topic string, partitionKey string, messageID string, visibilityTimeoutMillis int64) (retErr error) { + op := metrics.Begin(s.scope, "set_visibility") + defer func() { op.Complete(retErr) }() var invisibleUntil int64 if visibilityTimeoutMillis > 0 { - invisibleUntil = start.UnixMilli() + visibilityTimeoutMillis + invisibleUntil = time.Now().UnixMilli() + visibilityTimeoutMillis } else { invisibleUntil = 0 } @@ -519,18 +344,11 @@ func (s *sqlmessageStore) SetVisibilityTimeout(ctx context.Context, topic string result, err := s.db.ExecContext(ctx, fmt.Sprintf(` UPDATE %s SET invisible_until = ? - WHERE topic = ? AND id = ? - `, MessagesTableName), invisibleUntil, topic, messageID) + WHERE topic = ? AND partition_key = ? AND id = ? + `, MessagesTableName), invisibleUntil, topic, partitionKey, messageID) if err != nil { - s.logger.Errorw("failed to set visibility timeout", - logTopic, topic, - logMessageID, messageID, - "timeout_ms", visibilityTimeoutMillis, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "exec_set"}).Counter("set_visibility.errors").Inc(1) - return fmt.Errorf("failed to set visibility timeout: %w", err) + return fmt.Errorf("failed to set visibility timeout for message %s in topic %s (timeout_ms %d): %w", messageID, topic, visibilityTimeoutMillis, err) } rows, err := result.RowsAffected() @@ -549,14 +367,11 @@ func (s *sqlmessageStore) SetVisibilityTimeout(ctx context.Context, topic string ) } - s.metrics.Counter("set_visibility.success").Inc(1) s.logger.Debugw("set visibility timeout", logTopic, topic, logMessageID, messageID, "timeout_ms", visibilityTimeoutMillis, - "duration_ms", time.Since(start).Milliseconds(), ) - success = true return nil } diff --git a/extension/queue/mysql/message_store_test.go b/extension/queue/mysql/message_store_test.go index 1f487aa9..79bfd4fd 100644 --- a/extension/queue/mysql/message_store_test.go +++ b/extension/queue/mysql/message_store_test.go @@ -28,7 +28,7 @@ import ( "github.com/uber/submitqueue/entity/queue" ) -// testMetrics returns a test metrics scope for use in tests +// testMetrics returns a test scope scope for use in tests func testMetrics() tally.Scope { return tally.NoopScope } @@ -105,10 +105,10 @@ func TestmessageStore_Delete(t *testing.T) { messageID := "msg1" mock.ExpectExec("DELETE FROM queue_messages"). - WithArgs(topic, messageID). + WithArgs(topic, "part1", messageID). WillReturnResult(sqlmock.NewResult(0, 1)) - err := store.Delete(ctx, topic, messageID) + err := store.Delete(ctx, topic, "part1", messageID) require.NoError(t, err) require.NoError(t, mock.ExpectationsWereMet()) } @@ -159,10 +159,10 @@ func TestmessageStore_SetVisibilityTimeout(t *testing.T) { visibilityTimeoutMillis := int64(5000) mock.ExpectExec("UPDATE queue_messages"). - WithArgs(sqlmock.AnyArg(), topic, messageID). + WithArgs(sqlmock.AnyArg(), topic, "part1", messageID). WillReturnResult(sqlmock.NewResult(0, 1)) - err := store.SetVisibilityTimeout(ctx, topic, messageID, visibilityTimeoutMillis) + err := store.SetVisibilityTimeout(ctx, topic, "part1", messageID, visibilityTimeoutMillis) require.NoError(t, err) require.NoError(t, mock.ExpectationsWereMet()) } @@ -182,12 +182,12 @@ func TestmessageStore_MoveToDLQ(t *testing.T) { // Expect transaction begin mock.ExpectBegin() - // Mock query for fetching message - SELECT payload, metadata, partition_key, created_at, published_at, retry_count - rows := sqlmock.NewRows([]string{"payload", "metadata", "partition_key", "created_at", "published_at", "retry_count"}). - AddRow([]byte("payload1"), []byte(`{"key":"value"}`), "part1", time.Now().UnixMilli(), time.Now().UnixMilli(), failureCount) + // Mock query for fetching message - SELECT payload, metadata, created_at, published_at, retry_count + rows := sqlmock.NewRows([]string{"payload", "metadata", "created_at", "published_at", "retry_count"}). + AddRow([]byte("payload1"), []byte(`{"key":"value"}`), time.Now().UnixMilli(), time.Now().UnixMilli(), failureCount) mock.ExpectQuery("SELECT (.+) FROM queue_messages"). - WithArgs(topic, messageID). + WithArgs(topic, "part1", messageID). WillReturnRows(rows) // Expect insert into queue_messages with DLQ topic and DLQ-specific columns @@ -199,13 +199,13 @@ func TestmessageStore_MoveToDLQ(t *testing.T) { // Expect delete from main table mock.ExpectExec("DELETE FROM queue_messages"). - WithArgs(topic, messageID). + WithArgs(topic, "part1", messageID). WillReturnResult(sqlmock.NewResult(0, 1)) // Expect commit mock.ExpectCommit() - err := store.MoveToDLQ(ctx, topic, messageID, failureCount, lastError, dlqTopicSuffix) + err := store.MoveToDLQ(ctx, topic, "part1", messageID, failureCount, lastError, dlqTopicSuffix) require.NoError(t, err) require.NoError(t, mock.ExpectationsWereMet()) } diff --git a/extension/queue/mysql/mock_stores.go b/extension/queue/mysql/mock_stores.go new file mode 100644 index 00000000..e3e64445 --- /dev/null +++ b/extension/queue/mysql/mock_stores.go @@ -0,0 +1,373 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: stores.go +// +// Generated by this command: +// +// mockgen -source=stores.go -destination=mock_stores.go -package=mysql +// + +// Package mysql is a generated GoMock package. +package mysql + +import ( + context "context" + reflect "reflect" + + queue "github.com/uber/submitqueue/entity/queue" + gomock "go.uber.org/mock/gomock" +) + +// MockmessageStore is a mock of messageStore interface. +type MockmessageStore struct { + ctrl *gomock.Controller + recorder *MockmessageStoreMockRecorder + isgomock struct{} +} + +// MockmessageStoreMockRecorder is the mock recorder for MockmessageStore. +type MockmessageStoreMockRecorder struct { + mock *MockmessageStore +} + +// NewMockmessageStore creates a new mock instance. +func NewMockmessageStore(ctrl *gomock.Controller) *MockmessageStore { + mock := &MockmessageStore{ctrl: ctrl} + mock.recorder = &MockmessageStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockmessageStore) EXPECT() *MockmessageStoreMockRecorder { + return m.recorder +} + +// Delete mocks base method. +func (m *MockmessageStore) Delete(ctx context.Context, topic, partitionKey, messageID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", ctx, topic, partitionKey, messageID) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete. +func (mr *MockmessageStoreMockRecorder) Delete(ctx, topic, partitionKey, messageID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockmessageStore)(nil).Delete), ctx, topic, partitionKey, messageID) +} + +// FetchByOffset mocks base method. +func (m *MockmessageStore) FetchByOffset(ctx context.Context, topic, partitionKey string, currentOffset int64, limit int, visibilityTimeoutMs int64) ([]messageRow, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchByOffset", ctx, topic, partitionKey, currentOffset, limit, visibilityTimeoutMs) + ret0, _ := ret[0].([]messageRow) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchByOffset indicates an expected call of FetchByOffset. +func (mr *MockmessageStoreMockRecorder) FetchByOffset(ctx, topic, partitionKey, currentOffset, limit, visibilityTimeoutMs any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchByOffset", reflect.TypeOf((*MockmessageStore)(nil).FetchByOffset), ctx, topic, partitionKey, currentOffset, limit, visibilityTimeoutMs) +} + +// Insert mocks base method. +func (m *MockmessageStore) Insert(ctx context.Context, topic string, messages []queue.Message) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Insert", ctx, topic, messages) + ret0, _ := ret[0].(error) + return ret0 +} + +// Insert indicates an expected call of Insert. +func (mr *MockmessageStoreMockRecorder) Insert(ctx, topic, messages any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Insert", reflect.TypeOf((*MockmessageStore)(nil).Insert), ctx, topic, messages) +} + +// MoveToDLQ mocks base method. +func (m *MockmessageStore) MoveToDLQ(ctx context.Context, topic, partitionKey, messageID string, failureCount int, lastError, dlqTopicSuffix string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MoveToDLQ", ctx, topic, partitionKey, messageID, failureCount, lastError, dlqTopicSuffix) + ret0, _ := ret[0].(error) + return ret0 +} + +// MoveToDLQ indicates an expected call of MoveToDLQ. +func (mr *MockmessageStoreMockRecorder) MoveToDLQ(ctx, topic, partitionKey, messageID, failureCount, lastError, dlqTopicSuffix any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveToDLQ", reflect.TypeOf((*MockmessageStore)(nil).MoveToDLQ), ctx, topic, partitionKey, messageID, failureCount, lastError, dlqTopicSuffix) +} + +// SetVisibilityTimeout mocks base method. +func (m *MockmessageStore) SetVisibilityTimeout(ctx context.Context, topic, partitionKey, messageID string, visibilityTimeoutMillis int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetVisibilityTimeout", ctx, topic, partitionKey, messageID, visibilityTimeoutMillis) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetVisibilityTimeout indicates an expected call of SetVisibilityTimeout. +func (mr *MockmessageStoreMockRecorder) SetVisibilityTimeout(ctx, topic, partitionKey, messageID, visibilityTimeoutMillis any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetVisibilityTimeout", reflect.TypeOf((*MockmessageStore)(nil).SetVisibilityTimeout), ctx, topic, partitionKey, messageID, visibilityTimeoutMillis) +} + +// MockoffsetStore is a mock of offsetStore interface. +type MockoffsetStore struct { + ctrl *gomock.Controller + recorder *MockoffsetStoreMockRecorder + isgomock struct{} +} + +// MockoffsetStoreMockRecorder is the mock recorder for MockoffsetStore. +type MockoffsetStoreMockRecorder struct { + mock *MockoffsetStore +} + +// NewMockoffsetStore creates a new mock instance. +func NewMockoffsetStore(ctrl *gomock.Controller) *MockoffsetStore { + mock := &MockoffsetStore{ctrl: ctrl} + mock.recorder = &MockoffsetStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockoffsetStore) EXPECT() *MockoffsetStoreMockRecorder { + return m.recorder +} + +// AckMessage mocks base method. +func (m *MockoffsetStore) AckMessage(ctx context.Context, topic, partitionKey, messageID string, offset int64, consumerGroup string, messageStore messageStore) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AckMessage", ctx, topic, partitionKey, messageID, offset, consumerGroup, messageStore) + ret0, _ := ret[0].(error) + return ret0 +} + +// AckMessage indicates an expected call of AckMessage. +func (mr *MockoffsetStoreMockRecorder) AckMessage(ctx, topic, partitionKey, messageID, offset, consumerGroup, messageStore any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AckMessage", reflect.TypeOf((*MockoffsetStore)(nil).AckMessage), ctx, topic, partitionKey, messageID, offset, consumerGroup, messageStore) +} + +// GetAckedOffset mocks base method. +func (m *MockoffsetStore) GetAckedOffset(ctx context.Context, topic, partitionKey, consumerGroup string) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAckedOffset", ctx, topic, partitionKey, consumerGroup) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAckedOffset indicates an expected call of GetAckedOffset. +func (mr *MockoffsetStoreMockRecorder) GetAckedOffset(ctx, topic, partitionKey, consumerGroup any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAckedOffset", reflect.TypeOf((*MockoffsetStore)(nil).GetAckedOffset), ctx, topic, partitionKey, consumerGroup) +} + +// Initialize mocks base method. +func (m *MockoffsetStore) Initialize(ctx context.Context, topic, partitionKey, consumerGroup string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Initialize", ctx, topic, partitionKey, consumerGroup) + ret0, _ := ret[0].(error) + return ret0 +} + +// Initialize indicates an expected call of Initialize. +func (mr *MockoffsetStoreMockRecorder) Initialize(ctx, topic, partitionKey, consumerGroup any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Initialize", reflect.TypeOf((*MockoffsetStore)(nil).Initialize), ctx, topic, partitionKey, consumerGroup) +} + +// UpdateAckedOffset mocks base method. +func (m *MockoffsetStore) UpdateAckedOffset(ctx context.Context, topic, partitionKey string, offset int64, consumerGroup string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateAckedOffset", ctx, topic, partitionKey, offset, consumerGroup) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateAckedOffset indicates an expected call of UpdateAckedOffset. +func (mr *MockoffsetStoreMockRecorder) UpdateAckedOffset(ctx, topic, partitionKey, offset, consumerGroup any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAckedOffset", reflect.TypeOf((*MockoffsetStore)(nil).UpdateAckedOffset), ctx, topic, partitionKey, offset, consumerGroup) +} + +// MockpartitionLeaseStore is a mock of partitionLeaseStore interface. +type MockpartitionLeaseStore struct { + ctrl *gomock.Controller + recorder *MockpartitionLeaseStoreMockRecorder + isgomock struct{} +} + +// MockpartitionLeaseStoreMockRecorder is the mock recorder for MockpartitionLeaseStore. +type MockpartitionLeaseStoreMockRecorder struct { + mock *MockpartitionLeaseStore +} + +// NewMockpartitionLeaseStore creates a new mock instance. +func NewMockpartitionLeaseStore(ctrl *gomock.Controller) *MockpartitionLeaseStore { + mock := &MockpartitionLeaseStore{ctrl: ctrl} + mock.recorder = &MockpartitionLeaseStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockpartitionLeaseStore) EXPECT() *MockpartitionLeaseStoreMockRecorder { + return m.recorder +} + +// DiscoverAndAcquirePartitions mocks base method. +func (m *MockpartitionLeaseStore) DiscoverAndAcquirePartitions(ctx context.Context, topic, subscriberName, consumerGroup string, leaseDurationMs int64, maxPartitions int) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DiscoverAndAcquirePartitions", ctx, topic, subscriberName, consumerGroup, leaseDurationMs, maxPartitions) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DiscoverAndAcquirePartitions indicates an expected call of DiscoverAndAcquirePartitions. +func (mr *MockpartitionLeaseStoreMockRecorder) DiscoverAndAcquirePartitions(ctx, topic, subscriberName, consumerGroup, leaseDurationMs, maxPartitions any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiscoverAndAcquirePartitions", reflect.TypeOf((*MockpartitionLeaseStore)(nil).DiscoverAndAcquirePartitions), ctx, topic, subscriberName, consumerGroup, leaseDurationMs, maxPartitions) +} + +// DiscoverPartitions mocks base method. +func (m *MockpartitionLeaseStore) DiscoverPartitions(ctx context.Context, topic string) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DiscoverPartitions", ctx, topic) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DiscoverPartitions indicates an expected call of DiscoverPartitions. +func (mr *MockpartitionLeaseStoreMockRecorder) DiscoverPartitions(ctx, topic any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiscoverPartitions", reflect.TypeOf((*MockpartitionLeaseStore)(nil).DiscoverPartitions), ctx, topic) +} + +// GetLeasedPartitions mocks base method. +func (m *MockpartitionLeaseStore) GetLeasedPartitions(ctx context.Context, topic, subscriberName, consumerGroup string) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLeasedPartitions", ctx, topic, subscriberName, consumerGroup) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetLeasedPartitions indicates an expected call of GetLeasedPartitions. +func (mr *MockpartitionLeaseStoreMockRecorder) GetLeasedPartitions(ctx, topic, subscriberName, consumerGroup any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLeasedPartitions", reflect.TypeOf((*MockpartitionLeaseStore)(nil).GetLeasedPartitions), ctx, topic, subscriberName, consumerGroup) +} + +// ReleaseLease mocks base method. +func (m *MockpartitionLeaseStore) ReleaseLease(ctx context.Context, topic, partitionKey, subscriberName, consumerGroup string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReleaseLease", ctx, topic, partitionKey, subscriberName, consumerGroup) + ret0, _ := ret[0].(error) + return ret0 +} + +// ReleaseLease indicates an expected call of ReleaseLease. +func (mr *MockpartitionLeaseStoreMockRecorder) ReleaseLease(ctx, topic, partitionKey, subscriberName, consumerGroup any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReleaseLease", reflect.TypeOf((*MockpartitionLeaseStore)(nil).ReleaseLease), ctx, topic, partitionKey, subscriberName, consumerGroup) +} + +// RenewLease mocks base method. +func (m *MockpartitionLeaseStore) RenewLease(ctx context.Context, topic, partitionKey, subscriberName, consumerGroup string, leaseDurationMs int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RenewLease", ctx, topic, partitionKey, subscriberName, consumerGroup, leaseDurationMs) + ret0, _ := ret[0].(error) + return ret0 +} + +// RenewLease indicates an expected call of RenewLease. +func (mr *MockpartitionLeaseStoreMockRecorder) RenewLease(ctx, topic, partitionKey, subscriberName, consumerGroup, leaseDurationMs any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RenewLease", reflect.TypeOf((*MockpartitionLeaseStore)(nil).RenewLease), ctx, topic, partitionKey, subscriberName, consumerGroup, leaseDurationMs) +} + +// TryAcquireLease mocks base method. +func (m *MockpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic, partitionKey, subscriberName, consumerGroup string, leaseDurationMs int64) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TryAcquireLease", ctx, topic, partitionKey, subscriberName, consumerGroup, leaseDurationMs) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TryAcquireLease indicates an expected call of TryAcquireLease. +func (mr *MockpartitionLeaseStoreMockRecorder) TryAcquireLease(ctx, topic, partitionKey, subscriberName, consumerGroup, leaseDurationMs any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryAcquireLease", reflect.TypeOf((*MockpartitionLeaseStore)(nil).TryAcquireLease), ctx, topic, partitionKey, subscriberName, consumerGroup, leaseDurationMs) +} + +// MocksubscriberHeartbeatStore is a mock of subscriberHeartbeatStore interface. +type MocksubscriberHeartbeatStore struct { + ctrl *gomock.Controller + recorder *MocksubscriberHeartbeatStoreMockRecorder + isgomock struct{} +} + +// MocksubscriberHeartbeatStoreMockRecorder is the mock recorder for MocksubscriberHeartbeatStore. +type MocksubscriberHeartbeatStoreMockRecorder struct { + mock *MocksubscriberHeartbeatStore +} + +// NewMocksubscriberHeartbeatStore creates a new mock instance. +func NewMocksubscriberHeartbeatStore(ctrl *gomock.Controller) *MocksubscriberHeartbeatStore { + mock := &MocksubscriberHeartbeatStore{ctrl: ctrl} + mock.recorder = &MocksubscriberHeartbeatStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MocksubscriberHeartbeatStore) EXPECT() *MocksubscriberHeartbeatStoreMockRecorder { + return m.recorder +} + +// ActiveSubscribers mocks base method. +func (m *MocksubscriberHeartbeatStore) ActiveSubscribers(ctx context.Context, topic, consumerGroup string, staleDurationMs int64) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ActiveSubscribers", ctx, topic, consumerGroup, staleDurationMs) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ActiveSubscribers indicates an expected call of ActiveSubscribers. +func (mr *MocksubscriberHeartbeatStoreMockRecorder) ActiveSubscribers(ctx, topic, consumerGroup, staleDurationMs any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ActiveSubscribers", reflect.TypeOf((*MocksubscriberHeartbeatStore)(nil).ActiveSubscribers), ctx, topic, consumerGroup, staleDurationMs) +} + +// Deregister mocks base method. +func (m *MocksubscriberHeartbeatStore) Deregister(ctx context.Context, topic, subscriberName, consumerGroup string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Deregister", ctx, topic, subscriberName, consumerGroup) + ret0, _ := ret[0].(error) + return ret0 +} + +// Deregister indicates an expected call of Deregister. +func (mr *MocksubscriberHeartbeatStoreMockRecorder) Deregister(ctx, topic, subscriberName, consumerGroup any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Deregister", reflect.TypeOf((*MocksubscriberHeartbeatStore)(nil).Deregister), ctx, topic, subscriberName, consumerGroup) +} + +// Heartbeat mocks base method. +func (m *MocksubscriberHeartbeatStore) Heartbeat(ctx context.Context, topic, subscriberName, consumerGroup string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Heartbeat", ctx, topic, subscriberName, consumerGroup) + ret0, _ := ret[0].(error) + return ret0 +} + +// Heartbeat indicates an expected call of Heartbeat. +func (mr *MocksubscriberHeartbeatStoreMockRecorder) Heartbeat(ctx, topic, subscriberName, consumerGroup any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Heartbeat", reflect.TypeOf((*MocksubscriberHeartbeatStore)(nil).Heartbeat), ctx, topic, subscriberName, consumerGroup) +} diff --git a/extension/queue/mysql/offset_store.go b/extension/queue/mysql/offset_store.go index 07adc788..a8cc0899 100644 --- a/extension/queue/mysql/offset_store.go +++ b/extension/queue/mysql/offset_store.go @@ -21,6 +21,7 @@ import ( "time" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "go.uber.org/zap" ) @@ -28,25 +29,23 @@ import ( type sqloffsetStore struct { db *sql.DB logger *zap.SugaredLogger - metrics tally.Scope + scope tally.Scope } -// Metric names for offset store -const ( - metricAckMessageErrors = "ack_message.errors" -) - // newOffsetStore creates a new SQL offset store -func newOffsetStore(db *sql.DB, logger *zap.Logger, metrics tally.Scope) offsetStore { +func newOffsetStore(db *sql.DB, logger *zap.Logger, scope tally.Scope) offsetStore { return &sqloffsetStore{ db: db, - logger: logger.Sugar().Named("offset_store"), - metrics: metrics.SubScope("offset_store"), + logger: logger.Sugar().Named("queue_mysql_offset_store"), + scope: scope.SubScope("queue_mysql_offset_store"), } } // Initialize creates an offset entry for a topic+partition if it doesn't exist -func (s *sqloffsetStore) Initialize(ctx context.Context, topic string, partitionKey string, consumerGroup string) error { +func (s *sqloffsetStore) Initialize(ctx context.Context, topic string, partitionKey string, consumerGroup string) (retErr error) { + op := metrics.Begin(s.scope, "initialize") + defer func() { op.Complete(retErr) }() + now := time.Now().UnixMilli() // Try to insert, ignore if already exists @@ -55,11 +54,18 @@ func (s *sqloffsetStore) Initialize(ctx context.Context, topic string, partition VALUES (?, ?, ?, 0, ?) `, OffsetsTableName), consumerGroup, topic, partitionKey, now) - return err + if err != nil { + return fmt.Errorf("failed to initialize offset for topic %s partition %s: %w", topic, partitionKey, err) + } + + return nil } // GetAckedOffset returns the current acked offset for a topic+partition -func (s *sqloffsetStore) GetAckedOffset(ctx context.Context, topic string, partitionKey string, consumerGroup string) (int64, error) { +func (s *sqloffsetStore) GetAckedOffset(ctx context.Context, topic string, partitionKey string, consumerGroup string) (_ int64, retErr error) { + op := metrics.Begin(s.scope, "get_acked_offset") + defer func() { op.Complete(retErr) }() + var offset int64 err := s.db.QueryRowContext(ctx, fmt.Sprintf(` SELECT offset_acked FROM %s WHERE consumer_group = ? AND topic = ? AND partition_key = ? @@ -71,14 +77,17 @@ func (s *sqloffsetStore) GetAckedOffset(ctx context.Context, topic string, parti } if err != nil { - return 0, fmt.Errorf("failed to get acked offset: %w", err) + return 0, fmt.Errorf("failed to get acked offset for topic %s partition %s: %w", topic, partitionKey, err) } return offset, nil } // UpdateAckedOffset updates the offset_acked for a topic+partition (only if new offset is greater) -func (s *sqloffsetStore) UpdateAckedOffset(ctx context.Context, topic string, partitionKey string, offset int64, consumerGroup string) error { +func (s *sqloffsetStore) UpdateAckedOffset(ctx context.Context, topic string, partitionKey string, offset int64, consumerGroup string) (retErr error) { + op := metrics.Begin(s.scope, "update_acked_offset") + defer func() { op.Complete(retErr) }() + now := time.Now().UnixMilli() _, err := s.db.ExecContext(ctx, fmt.Sprintf(` @@ -87,25 +96,21 @@ func (s *sqloffsetStore) UpdateAckedOffset(ctx context.Context, topic string, pa WHERE consumer_group = ? AND topic = ? AND partition_key = ? AND offset_acked < ? `, OffsetsTableName), offset, now, consumerGroup, topic, partitionKey, offset) - return err + if err != nil { + return fmt.Errorf("failed to update acked offset for topic %s partition %s: %w", topic, partitionKey, err) + } + + return nil } // AckMessage atomically deletes a message and updates the acked offset -func (s *sqloffsetStore) AckMessage(ctx context.Context, topic string, partitionKey string, messageID string, offset int64, consumerGroup string, messageStore messageStore) error { - start := time.Now() - success := false - defer func() { - result := "error" - if success { - result = "success" - } - s.metrics.Tagged(map[string]string{"result": result}).Timer("ack_message.latency").Record(time.Since(start)) - }() +func (s *sqloffsetStore) AckMessage(ctx context.Context, topic string, partitionKey string, messageID string, offset int64, consumerGroup string, messageStore messageStore) (retErr error) { + op := metrics.Begin(s.scope, "ack_message") + defer func() { op.Complete(retErr) }() tx, err := s.db.BeginTx(ctx, nil) if err != nil { - s.metrics.Tagged(map[string]string{tagErrorType: "begin_transaction"}).Counter(metricAckMessageErrors).Inc(1) - return fmt.Errorf("failed to begin transaction: %w", err) + return fmt.Errorf("failed to begin transaction for topic %s partition %s: %w", topic, partitionKey, err) } defer tx.Rollback() @@ -114,11 +119,10 @@ func (s *sqloffsetStore) AckMessage(ctx context.Context, topic string, partition DELETE FROM %s WHERE topic = ? AND partition_key = ? AND id = ? `, MessagesTableName), topic, partitionKey, messageID) if err != nil { - s.metrics.Tagged(map[string]string{tagErrorType: "delete_message"}).Counter(metricAckMessageErrors).Inc(1) - return fmt.Errorf("failed to delete message: %w", err) + return fmt.Errorf("failed to delete message %s in topic %s partition %s: %w", messageID, topic, partitionKey, err) } - now := start.UnixMilli() + now := time.Now().UnixMilli() // Update offset_acked (insert if not exists) _, err = tx.ExecContext(ctx, fmt.Sprintf(` @@ -129,25 +133,19 @@ func (s *sqloffsetStore) AckMessage(ctx context.Context, topic string, partition updated_at = VALUES(updated_at) `, OffsetsTableName), consumerGroup, topic, partitionKey, offset, now) if err != nil { - s.metrics.Tagged(map[string]string{tagErrorType: "update_offset"}).Counter(metricAckMessageErrors).Inc(1) - return fmt.Errorf("failed to update offset: %w", err) + return fmt.Errorf("failed to update offset for topic %s partition %s: %w", topic, partitionKey, err) } if err := tx.Commit(); err != nil { - s.metrics.Tagged(map[string]string{tagErrorType: "commit"}).Counter(metricAckMessageErrors).Inc(1) - return fmt.Errorf("failed to commit transaction: %w", err) + return fmt.Errorf("failed to commit transaction for topic %s partition %s: %w", topic, partitionKey, err) } - // Log and emit metrics after transaction completes - s.metrics.Counter("ack_message.success").Inc(1) s.logger.Debugw("acked message", logTopic, topic, logPartitionKey, partitionKey, logMessageID, messageID, "offset", offset, - "duration_ms", time.Since(start).Milliseconds(), ) - success = true return nil } diff --git a/extension/queue/mysql/partition_lease_store.go b/extension/queue/mysql/partition_lease_store.go index 920884a3..26f4139c 100644 --- a/extension/queue/mysql/partition_lease_store.go +++ b/extension/queue/mysql/partition_lease_store.go @@ -21,7 +21,7 @@ import ( "time" "github.com/uber-go/tally/v4" - coremetrics "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/core/metrics" "go.uber.org/zap" ) @@ -29,39 +29,24 @@ import ( type sqlpartitionLeaseStore struct { db *sql.DB logger *zap.SugaredLogger - metrics tally.Scope + scope tally.Scope } -// Metric names for partition lease store -const ( - metricTryAcquireLeaseErrors = "try_acquire_lease.errors" - metricRenewLeaseErrors = "renew_lease.errors" - metricGetLeasedPartitionsErrors = "get_leased_partitions.errors" - metricDiscoverAndAcquireErrors = "discover_and_acquire.errors" -) - // newPartitionLeaseStore creates a new SQL partition lease store func newPartitionLeaseStore(db *sql.DB, logger *zap.Logger, scope tally.Scope) partitionLeaseStore { return &sqlpartitionLeaseStore{ db: db, - logger: logger.Sugar().Named("partition_lease_store"), - metrics: scope.SubScope("partition_lease_store"), + logger: logger.Sugar().Named("queue_mysql_partition_lease_store"), + scope: scope.SubScope("queue_mysql_partition_lease_store"), } } // TryAcquireLease attempts to acquire or renew a lease for a partition -func (s *sqlpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic string, partitionKey string, subscriberName string, consumerGroup string, leaseDurationMs int64) (bool, error) { - start := time.Now() - success := false - defer func() { - result := "error" - if success { - result = "success" - } - s.metrics.Tagged(map[string]string{"result": result}).Timer("try_acquire_lease.latency").Record(time.Since(start)) - }() +func (s *sqlpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic string, partitionKey string, subscriberName string, consumerGroup string, leaseDurationMs int64) (_ bool, retErr error) { + op := metrics.Begin(s.scope, "try_acquire_lease") + defer func() { op.Complete(retErr) }() - now := start.UnixMilli() + now := time.Now().UnixMilli() staleThreshold := now - leaseDurationMs // Try to insert or update stale lease @@ -77,13 +62,7 @@ func (s *sqlpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic stri staleThreshold, staleThreshold, staleThreshold) if err != nil { - s.logger.Errorw("failed to acquire lease", - logTopic, topic, - logPartitionKey, partitionKey, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "exec_acquire"}).Counter(metricTryAcquireLeaseErrors).Inc(1) - return false, fmt.Errorf("failed to acquire lease: %w", err) + return false, fmt.Errorf("failed to acquire lease for topic %s partition %s: %w", topic, partitionKey, err) } // Check if we own the lease @@ -94,43 +73,29 @@ func (s *sqlpartitionLeaseStore) TryAcquireLease(ctx context.Context, topic stri `, PartitionLeasesTableName), consumerGroup, topic, partitionKey).Scan(&owner) if err != nil { - s.logger.Errorw("failed to check lease ownership", - logTopic, topic, - logPartitionKey, partitionKey, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "check_ownership"}).Counter(metricTryAcquireLeaseErrors).Inc(1) - return false, fmt.Errorf("failed to check lease ownership: %w", err) + return false, fmt.Errorf("failed to check lease ownership for topic %s partition %s: %w", topic, partitionKey, err) } acquired := owner == subscriberName if acquired { - s.metrics.Counter("try_acquire_lease.acquired").Inc(1) + metrics.NamedCounter(s.scope, "try_acquire_lease", "acquired", 1) s.logger.Debugw("acquired lease", logTopic, topic, logPartitionKey, partitionKey, ) } else { - s.metrics.Counter("try_acquire_lease.not_acquired").Inc(1) + metrics.NamedCounter(s.scope, "try_acquire_lease", "not_acquired", 1) } - success = true return acquired, nil } // RenewLease renews the lease for a partition owned by this worker -func (s *sqlpartitionLeaseStore) RenewLease(ctx context.Context, topic string, partitionKey string, subscriberName string, consumerGroup string, leaseDurationMs int64) error { - start := time.Now() - success := false - defer func() { - result := "error" - if success { - result = "success" - } - s.metrics.Tagged(map[string]string{"result": result}).Timer("renew_lease.latency").Record(time.Since(start)) - }() +func (s *sqlpartitionLeaseStore) RenewLease(ctx context.Context, topic string, partitionKey string, subscriberName string, consumerGroup string, leaseDurationMs int64) (retErr error) { + op := metrics.Begin(s.scope, "renew_lease") + defer func() { op.Complete(retErr) }() - now := start.UnixMilli() + now := time.Now().UnixMilli() result, err := s.db.ExecContext(ctx, fmt.Sprintf(` UPDATE %s @@ -139,24 +104,12 @@ func (s *sqlpartitionLeaseStore) RenewLease(ctx context.Context, topic string, p `, PartitionLeasesTableName), now, consumerGroup, topic, partitionKey, subscriberName) if err != nil { - s.logger.Errorw("failed to renew lease", - logTopic, topic, - logPartitionKey, partitionKey, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "exec_renew"}).Counter(metricRenewLeaseErrors).Inc(1) - return fmt.Errorf("failed to renew lease: %w", err) + return fmt.Errorf("failed to renew lease for topic %s partition %s: %w", topic, partitionKey, err) } rows, err := result.RowsAffected() if err != nil { - s.logger.Errorw("failed to check renewal result", - logTopic, topic, - logPartitionKey, partitionKey, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "check_rows_affected"}).Counter(metricRenewLeaseErrors).Inc(1) - return fmt.Errorf("failed to check renewal result: %w", err) + return fmt.Errorf("failed to check renewal result for topic %s partition %s: %w", topic, partitionKey, err) } if rows == 0 { @@ -164,32 +117,21 @@ func (s *sqlpartitionLeaseStore) RenewLease(ctx context.Context, topic string, p logTopic, topic, logPartitionKey, partitionKey, ) - s.metrics.Tagged(map[string]string{tagErrorType: "not_owned"}).Counter(metricRenewLeaseErrors).Inc(1) - return fmt.Errorf("lease not owned by this worker or already expired") + return &ErrLeaseExpired{Topic: topic, PartitionKey: partitionKey} } - s.metrics.Counter("renew_lease.success").Inc(1) s.logger.Debugw("renewed lease", logTopic, topic, logPartitionKey, partitionKey, - "duration_ms", time.Since(start).Milliseconds(), ) - success = true return nil } // ReleaseLease releases the lease for a partition owned by this worker -func (s *sqlpartitionLeaseStore) ReleaseLease(ctx context.Context, topic string, partitionKey string, subscriberName string, consumerGroup string) error { - start := time.Now() - success := false - defer func() { - result := "error" - if success { - result = "success" - } - s.metrics.Tagged(map[string]string{"result": result}).Timer("release_lease.latency").Record(time.Since(start)) - }() +func (s *sqlpartitionLeaseStore) ReleaseLease(ctx context.Context, topic string, partitionKey string, subscriberName string, consumerGroup string) (retErr error) { + op := metrics.Begin(s.scope, "release_lease") + defer func() { op.Complete(retErr) }() result, err := s.db.ExecContext(ctx, fmt.Sprintf(` DELETE FROM %s @@ -197,41 +139,25 @@ func (s *sqlpartitionLeaseStore) ReleaseLease(ctx context.Context, topic string, `, PartitionLeasesTableName), consumerGroup, topic, partitionKey, subscriberName) if err != nil { - s.logger.Errorw("failed to release lease", - logTopic, topic, - logPartitionKey, partitionKey, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "exec_release"}).Counter("release_lease.errors").Inc(1) - return fmt.Errorf("failed to release lease: %w", err) + return fmt.Errorf("failed to release lease for topic %s partition %s: %w", topic, partitionKey, err) } - // Only increment success counter if we actually deleted a row (idempotent) + // Only log if we actually deleted a row (idempotent) rows, _ := result.RowsAffected() if rows > 0 { - s.metrics.Counter("release_lease.success").Inc(1) s.logger.Debugw("released lease", logTopic, topic, logPartitionKey, partitionKey, - "duration_ms", time.Since(start).Milliseconds(), ) } - success = true return nil } // GetLeasedPartitions returns all partitions currently leased by this worker -func (s *sqlpartitionLeaseStore) GetLeasedPartitions(ctx context.Context, topic string, subscriberName string, consumerGroup string) ([]string, error) { - start := time.Now() - success := false - defer func() { - result := "error" - if success { - result = "success" - } - s.metrics.Tagged(map[string]string{"result": result}).Timer("get_leased_partitions.latency").Record(time.Since(start)) - }() +func (s *sqlpartitionLeaseStore) GetLeasedPartitions(ctx context.Context, topic string, subscriberName string, consumerGroup string) (_ []string, retErr error) { + op := metrics.Begin(s.scope, "get_leased_partitions") + defer func() { op.Complete(retErr) }() rows, err := s.db.QueryContext(ctx, fmt.Sprintf(` SELECT partition_key FROM %s @@ -239,12 +165,7 @@ func (s *sqlpartitionLeaseStore) GetLeasedPartitions(ctx context.Context, topic `, PartitionLeasesTableName), consumerGroup, topic, subscriberName) if err != nil { - s.logger.Errorw("failed to get leased partitions", - logTopic, topic, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "query"}).Counter(metricGetLeasedPartitionsErrors).Inc(1) - return nil, fmt.Errorf("failed to get leased partitions: %w", err) + return nil, fmt.Errorf("failed to get leased partitions for topic %s: %w", topic, err) } defer rows.Close() @@ -252,46 +173,30 @@ func (s *sqlpartitionLeaseStore) GetLeasedPartitions(ctx context.Context, topic for rows.Next() { var partition string if err := rows.Scan(&partition); err != nil { - s.logger.Errorw("failed to scan partition", - logTopic, topic, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "scan_partition"}).Counter(metricGetLeasedPartitionsErrors).Inc(1) - return nil, fmt.Errorf("failed to scan partition: %w", err) + return nil, fmt.Errorf("failed to scan partition for topic %s: %w", topic, err) } partitions = append(partitions, partition) } if err := rows.Err(); err != nil { - s.metrics.Tagged(map[string]string{tagErrorType: "row_iteration"}).Counter(metricGetLeasedPartitionsErrors).Inc(1) - return nil, fmt.Errorf("row iteration error: %w", err) + return nil, fmt.Errorf("row iteration error for leased partitions in topic %s: %w", topic, err) } - s.metrics.Counter("get_leased_partitions.success").Inc(1) - s.metrics.Counter("partitions.leased").Inc(int64(len(partitions))) + metrics.NamedCounter(s.scope, "get_leased_partitions", "partitions_leased", int64(len(partitions))) s.logger.Debugw("retrieved leased partitions", logTopic, topic, "count", len(partitions), - "duration_ms", time.Since(start).Milliseconds(), ) - success = true return partitions, nil } // DiscoverAndAcquirePartitions discovers partitions from messages table and tries to acquire leases // Returns the number of new leases acquired // maxPartitions limits how many total partitions this subscriber can own (0 = unlimited) -func (s *sqlpartitionLeaseStore) DiscoverAndAcquirePartitions(ctx context.Context, topic string, subscriberName string, consumerGroup string, leaseDurationMs int64, maxPartitions int) (int, error) { - start := time.Now() - success := false - defer func() { - result := "error" - if success { - result = "success" - } - s.metrics.Tagged(map[string]string{"result": result}).Timer("discover_and_acquire.latency").Record(time.Since(start)) - }() +func (s *sqlpartitionLeaseStore) DiscoverAndAcquirePartitions(ctx context.Context, topic string, subscriberName string, consumerGroup string, leaseDurationMs int64, maxPartitions int) (_ int, retErr error) { + op := metrics.Begin(s.scope, "discover_and_acquire") + defer func() { op.Complete(retErr) }() // Query distinct partition_keys from messages table. // The maxPartitions cap limits how many leases this subscriber acquires, @@ -300,12 +205,7 @@ func (s *sqlpartitionLeaseStore) DiscoverAndAcquirePartitions(ctx context.Contex SELECT DISTINCT partition_key FROM %s WHERE topic = ? ORDER BY partition_key `, MessagesTableName), topic) if err != nil { - s.logger.Errorw("failed to discover partitions", - logTopic, topic, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "query_partitions"}).Counter(metricDiscoverAndAcquireErrors).Inc(1) - return 0, fmt.Errorf("failed to discover partitions: %w", err) + return 0, fmt.Errorf("failed to discover partitions for topic %s: %w", topic, err) } defer rows.Close() @@ -313,19 +213,13 @@ func (s *sqlpartitionLeaseStore) DiscoverAndAcquirePartitions(ctx context.Contex for rows.Next() { var partitionKey string if err := rows.Scan(&partitionKey); err != nil { - s.logger.Errorw("failed to scan partition key", - logTopic, topic, - logError, err, - ) - s.metrics.Tagged(map[string]string{tagErrorType: "scan_partition"}).Counter(metricDiscoverAndAcquireErrors).Inc(1) - return 0, fmt.Errorf("failed to scan partition key: %w", err) + return 0, fmt.Errorf("failed to scan partition key for topic %s: %w", topic, err) } partitions = append(partitions, partitionKey) } if err := rows.Err(); err != nil { - s.metrics.Tagged(map[string]string{tagErrorType: "row_iteration"}).Counter(metricDiscoverAndAcquireErrors).Inc(1) - return 0, fmt.Errorf("row iteration error: %w", err) + return 0, fmt.Errorf("row iteration error for partition discovery in topic %s: %w", topic, err) } s.logger.Debugw("discovered partitions", @@ -372,30 +266,27 @@ func (s *sqlpartitionLeaseStore) DiscoverAndAcquirePartitions(ctx context.Contex } } - s.metrics.Counter("discover_and_acquire.success").Inc(1) - s.metrics.Counter("partitions.discovered").Inc(int64(len(partitions))) - s.metrics.Counter("partitions.acquired").Inc(int64(acquiredCount)) + metrics.NamedCounter(s.scope, "discover_and_acquire", "partitions_discovered", int64(len(partitions))) + metrics.NamedCounter(s.scope, "discover_and_acquire", "partitions_acquired", int64(acquiredCount)) s.logger.Infow("completed partition discovery and acquisition", logTopic, topic, "discovered_count", len(partitions), "acquired_count", acquiredCount, - "duration_ms", time.Since(start).Milliseconds(), ) - success = true return acquiredCount, nil } // DiscoverPartitions returns all distinct partition keys for a topic from the messages table. func (s *sqlpartitionLeaseStore) DiscoverPartitions(ctx context.Context, topic string) (_ []string, retErr error) { - op := coremetrics.Begin(s.metrics, "discover_partitions") + op := metrics.Begin(s.scope, "discover_partitions") defer func() { op.Complete(retErr) }() rows, err := s.db.QueryContext(ctx, fmt.Sprintf(` SELECT DISTINCT partition_key FROM %s WHERE topic = ? ORDER BY partition_key `, MessagesTableName), topic) if err != nil { - return nil, fmt.Errorf("failed to discover partitions: %w", err) + return nil, fmt.Errorf("failed to discover partitions for topic %s: %w", topic, err) } defer rows.Close() @@ -403,13 +294,13 @@ func (s *sqlpartitionLeaseStore) DiscoverPartitions(ctx context.Context, topic s for rows.Next() { var partitionKey string if err := rows.Scan(&partitionKey); err != nil { - return nil, fmt.Errorf("failed to scan partition key: %w", err) + return nil, fmt.Errorf("failed to scan partition key for topic %s: %w", topic, err) } partitions = append(partitions, partitionKey) } if err := rows.Err(); err != nil { - return nil, fmt.Errorf("row iteration error: %w", err) + return nil, fmt.Errorf("row iteration error for partition discovery in topic %s: %w", topic, err) } return partitions, nil diff --git a/extension/queue/mysql/publisher.go b/extension/queue/mysql/publisher.go index 0b86ad0e..d4520821 100644 --- a/extension/queue/mysql/publisher.go +++ b/extension/queue/mysql/publisher.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "go.uber.org/zap" "github.com/uber/submitqueue/entity/queue" @@ -27,42 +28,40 @@ import ( type publisher struct { logger *zap.SugaredLogger - metrics tally.Scope + scope tally.Scope messageStore messageStore mu sync.RWMutex closed bool } // NewPublisher creates a publisher with the given dependencies -func NewPublisher(logger *zap.SugaredLogger, metrics tally.Scope, messageStore messageStore) *publisher { +func NewPublisher(logger *zap.SugaredLogger, scope tally.Scope, messageStore messageStore) *publisher { return &publisher{ - logger: logger, - metrics: metrics, + logger: logger.Named("queue_mysql_publisher"), + scope: scope.SubScope("queue_mysql_publisher"), messageStore: messageStore, } } // Publish sends a message to the specified topic -func (p *publisher) Publish(ctx context.Context, topic string, message queue.Message) error { +func (p *publisher) Publish(ctx context.Context, topic string, message queue.Message) (retErr error) { + op := metrics.Begin(p.scope, "publish") + defer func() { op.Complete(retErr) }() + // Check if closed (under lock) p.mu.RLock() closed := p.closed p.mu.RUnlock() if closed { - p.logger.Errorw("publish failure: publisher is closed", "topic", topic) - return fmt.Errorf("publisher is closed") + return fmt.Errorf("%w for topic: %s", ErrPublisherClosed, topic) } if err := p.messageStore.Insert(ctx, topic, []queue.Message{message}); err != nil { - p.metrics.Tagged(map[string]string{"topic": topic}).Counter("publish_errors").Inc(1) - p.logger.Errorw("publish failure: message store insert error", "topic", topic, "error", err) - return fmt.Errorf("publish message store insert error: %w", err) + return fmt.Errorf("failed to publish message to topic %s: %w", topic, err) } - p.metrics.Tagged(map[string]string{"topic": topic}).Counter("messages_published").Inc(1) - p.logger.Debugw("published message", "topic", topic, "message_id", message.ID) - + p.logger.Debugw("published message", "topic", topic, "message_id", message.ID, "partition_key", message.PartitionKey) return nil } diff --git a/extension/queue/mysql/publisher_test.go b/extension/queue/mysql/publisher_test.go index 5e38b296..85479c29 100644 --- a/extension/queue/mysql/publisher_test.go +++ b/extension/queue/mysql/publisher_test.go @@ -198,7 +198,7 @@ func TestPublisher_PublishMetrics(t *testing.T) { } // Metrics should have been recorded (we're using NoopScope in tests, so just verify no errors) - // In a real implementation, you'd use a mock metrics scope to verify calls + // In a real implementation, you'd use a mock scope scope to verify calls } func TestPublisher_ConcurrentPublish(t *testing.T) { diff --git a/extension/queue/mysql/schema/queue_messages.sql b/extension/queue/mysql/schema/queue_messages.sql index a60851c5..22a2048a 100644 --- a/extension/queue/mysql/schema/queue_messages.sql +++ b/extension/queue/mysql/schema/queue_messages.sql @@ -40,11 +40,16 @@ CREATE TABLE IF NOT EXISTS queue_messages ( last_error TEXT NOT NULL, original_topic VARCHAR(255) NOT NULL, - -- Supports: SELECT ... WHERE topic=? AND partition_key=? AND invisible_until<=? ORDER BY offset - -- Used by subscribers to poll for ready-to-process messages within their assigned partition + -- Hot-path index for the subscriber poll loop (FetchByOffset). + -- Query: SELECT ... WHERE topic=? AND partition_key=? AND offset>? AND invisible_until<=? ORDER BY offset LIMIT ? + -- Also used by DiscoverPartitions: SELECT DISTINCT partition_key WHERE topic=? (leading prefix). INDEX idx_topic_partition_visible_offset (topic, partition_key, invisible_until, offset), - -- Supports: INSERT ... ON DUPLICATE KEY to enforce idempotent publishes - -- Also enables efficient lookups for message updates/deletes by ID + -- Enforces idempotent publishes — prevents duplicate message IDs within a partition. + -- Used by AckMessage: DELETE ... WHERE topic=? AND partition_key=? AND id=? + -- Used by FetchByOffset visibility update: UPDATE ... WHERE topic=? AND partition_key=? AND id IN (...) + -- Used by Delete: DELETE ... WHERE topic=? AND partition_key=? AND id=? + -- Used by MoveToDLQ: SELECT/DELETE ... WHERE topic=? AND partition_key=? AND id=? + -- Used by SetVisibilityTimeout: UPDATE ... WHERE topic=? AND partition_key=? AND id=? UNIQUE KEY idx_topic_partition_id (topic, partition_key, id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; diff --git a/extension/queue/mysql/schema/queue_offsets.sql b/extension/queue/mysql/schema/queue_offsets.sql index 888aa666..6423b7da 100644 --- a/extension/queue/mysql/schema/queue_offsets.sql +++ b/extension/queue/mysql/schema/queue_offsets.sql @@ -18,16 +18,18 @@ CREATE TABLE IF NOT EXISTS queue_offsets ( -- Last update timestamp (epoch milliseconds) updated_at BIGINT UNSIGNED NOT NULL, - -- Primary key ensures each consumer group has one offset per topic/partition - -- Supports: INSERT ... ON DUPLICATE KEY UPDATE for idempotent offset updates - -- Also enables efficient lookups: SELECT ... WHERE consumer_group=? AND topic=? AND partition_key=? + -- Ensures each consumer group has one offset per topic/partition. + -- Used by Initialize: INSERT IGNORE ... (exact PK match for duplicate check) + -- Used by GetAckedOffset: SELECT ... WHERE consumer_group=? AND topic=? AND partition_key=? + -- Used by UpdateAckedOffset: UPDATE ... WHERE consumer_group=? AND topic=? AND partition_key=? + -- Used by AckMessage: INSERT ... ON DUPLICATE KEY UPDATE (exact PK match) + -- Used by admin ListOffsets: SELECT ... WHERE consumer_group=? (PK leading prefix) + -- Note: idx_consumer_group was removed — consumer_group is the leading PK column, + -- so MySQL already uses the PK for WHERE consumer_group=? queries. PRIMARY KEY (consumer_group, topic, partition_key), - -- Supports: SELECT ... WHERE consumer_group=? - -- Used for querying all offsets for a specific consumer group (e.g., for monitoring or rebalancing) - INDEX idx_consumer_group (consumer_group), - - -- Supports: SELECT ... WHERE topic=? - -- Used for querying all consumer groups consuming a specific topic + -- Used by admin queries: SELECT COUNT(DISTINCT consumer_group) WHERE topic=? + -- topic is the second column of the PK, so WHERE topic=? alone cannot use the PK. + -- This index enables admin/monitoring queries to find all consumer groups for a topic. INDEX idx_topic (topic) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; diff --git a/extension/queue/mysql/schema/queue_partition_leases.sql b/extension/queue/mysql/schema/queue_partition_leases.sql index 715b6d03..d63b16a5 100644 --- a/extension/queue/mysql/schema/queue_partition_leases.sql +++ b/extension/queue/mysql/schema/queue_partition_leases.sql @@ -22,16 +22,17 @@ CREATE TABLE IF NOT EXISTS queue_partition_leases ( -- Used to detect stale leases lease_renewed_at BIGINT UNSIGNED NOT NULL, - -- Primary key ensures each partition can only be leased by one worker per consumer group - -- Supports: INSERT ... ON DUPLICATE KEY UPDATE for lease acquisition and renewal - -- Also enables efficient lookups: SELECT ... WHERE consumer_group=? AND topic=? AND partition_key=? + -- Ensures each partition can only be leased by one worker per consumer group. + -- Used by TryAcquireLease: INSERT ... ON DUPLICATE KEY UPDATE (exact PK match) + -- Used by RenewLease: UPDATE ... WHERE consumer_group=? AND topic=? AND partition_key=? AND leased_by=? + -- Used by ReleaseLease: DELETE ... WHERE consumer_group=? AND topic=? AND partition_key=? AND leased_by=? + -- Used by GetLeasedPartitions: SELECT ... WHERE consumer_group=? AND topic=? AND leased_by=? + -- (PK prefix consumer_group, topic narrows the scan; leased_by filtered on remaining rows. + -- Partition count per topic per consumer group is small, so no separate index needed.) + -- Note: idx_leased_by was removed — no query uses WHERE leased_by=? alone. PRIMARY KEY (consumer_group, topic, partition_key), - -- Supports: SELECT ... WHERE leased_by=? - -- Used for querying all partitions owned by a specific worker (e.g., for graceful shutdown or rebalancing) - INDEX idx_leased_by (leased_by), - - -- Supports: SELECT ... WHERE lease_renewed_at0 means deregistered at that time. deregistered_at BIGINT UNSIGNED NOT NULL, + -- Ensures each subscriber has exactly one heartbeat entry per consumer group + topic. + -- Used by Heartbeat: INSERT ... ON DUPLICATE KEY UPDATE (exact PK match for upsert) + -- Used by Deregister: UPDATE ... WHERE consumer_group=? AND topic=? AND subscriber_name=? (exact PK match) + -- Used by ActiveSubscribers: SELECT ... WHERE consumer_group=? AND topic=? AND heartbeat_at>=? AND deregistered_at=0 + -- (PK prefix consumer_group, topic narrows the scan; heartbeat_at and deregistered_at filtered + -- on remaining rows. Subscriber count per topic is small — typically a handful of workers — + -- so no secondary index needed.) PRIMARY KEY (consumer_group, topic, subscriber_name) -); +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; diff --git a/extension/queue/mysql/sql.go b/extension/queue/mysql/sql.go index 50c0abac..f8c91472 100644 --- a/extension/queue/mysql/sql.go +++ b/extension/queue/mysql/sql.go @@ -39,7 +39,7 @@ type Params struct { // Logger for debugging and observability (required) Logger *zap.Logger - // MetricsScope for metrics collection (required) + // MetricsScope for scope collection (required) MetricsScope tally.Scope } diff --git a/extension/queue/mysql/stores.go b/extension/queue/mysql/stores.go index 2d7a933a..a0325b93 100644 --- a/extension/queue/mysql/stores.go +++ b/extension/queue/mysql/stores.go @@ -62,8 +62,8 @@ type messageStore interface { // Insert inserts messages into the topic table Insert(ctx context.Context, topic string, messages []queue.Message) error - // Delete deletes a message by ID - Delete(ctx context.Context, topic string, messageID string) error + // Delete deletes a message by topic, partition key, and ID + Delete(ctx context.Context, topic string, partitionKey string, messageID string) error // FetchByOffset fetches messages with offset > currentOffset for a specific partition // Only fetches visible messages (invisible_until <= now) @@ -73,13 +73,13 @@ type messageStore interface { // MoveToDLQ moves a message to the dead letter queue // dlqTopicSuffix is appended to the original topic to form the DLQ topic name - MoveToDLQ(ctx context.Context, topic string, messageID string, failureCount int, lastError string, dlqTopicSuffix string) error + MoveToDLQ(ctx context.Context, topic string, partitionKey string, messageID string, failureCount int, lastError string, dlqTopicSuffix string) error // SetVisibilityTimeout sets the invisible_until timestamp for a message // visibilityTimeoutMillis: milliseconds from now to hide the message // If visibilityTimeoutMillis is 0, makes the message visible immediately // If visibilityTimeoutMillis > 0, makes the message invisible until now + visibilityTimeoutMillis - SetVisibilityTimeout(ctx context.Context, topic string, messageID string, visibilityTimeoutMillis int64) error + SetVisibilityTimeout(ctx context.Context, topic string, partitionKey string, messageID string, visibilityTimeoutMillis int64) error } // offsetStore handles offset table operations for per-partition offset tracking (internal use only) diff --git a/extension/queue/mysql/subscriber.go b/extension/queue/mysql/subscriber.go index 72d2eacc..4ee71d6b 100644 --- a/extension/queue/mysql/subscriber.go +++ b/extension/queue/mysql/subscriber.go @@ -25,6 +25,7 @@ import ( "github.com/uber-go/tally/v4" "go.uber.org/zap" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity/queue" extqueue "github.com/uber/submitqueue/extension/queue" ) @@ -45,7 +46,7 @@ const ( type subscriber struct { logger *zap.SugaredLogger - metrics tally.Scope + scope tally.Scope messageStore messageStore offsetStore offsetStore leaseStore partitionLeaseStore @@ -191,11 +192,10 @@ func (d *sqlDelivery) Ack(ctx context.Context) error { return err } - // Record metrics - d.subscriber.metrics.Tagged(map[string]string{ - "topic": d.topic, - "partition_key": d.partitionKey, - }).Counter("messages_acked").Inc(1) + metrics.NamedCounter(d.subscriber.scope, "ack", "messages_acked", 1, + metrics.Tag{Key: "topic", Value: d.topic}, + metrics.Tag{Key: "partition_key", Value: d.partitionKey}, + ) d.acknowledged = true return nil @@ -211,21 +211,14 @@ func (d *sqlDelivery) Nack(ctx context.Context, requeueAfterMillis int64) error } // Set visibility timeout to make message visible after requeueAfter duration - if err := d.subscriber.messageStore.SetVisibilityTimeout(ctx, d.topic, d.messageID, requeueAfterMillis); err != nil { - d.subscriber.logger.Errorw("failed to set visibility timeout for nack", - "topic", d.topic, - "partition_key", d.partitionKey, - "message_id", d.messageID, - "error", err, - ) - return err + if err := d.subscriber.messageStore.SetVisibilityTimeout(ctx, d.topic, d.partitionKey, d.messageID, requeueAfterMillis); err != nil { + return fmt.Errorf("failed to set visibility timeout for nack of message %s in topic %s partition %s: %w", d.messageID, d.topic, d.partitionKey, err) } - // Record metrics - d.subscriber.metrics.Tagged(map[string]string{ - "topic": d.topic, - "partition_key": d.partitionKey, - }).Counter("messages_nacked").Inc(1) + metrics.NamedCounter(d.subscriber.scope, "nack", "messages_nacked", 1, + metrics.Tag{Key: "topic", Value: d.topic}, + metrics.Tag{Key: "partition_key", Value: d.partitionKey}, + ) d.subscriber.logger.Infow("message nacked", "topic", d.topic, @@ -250,7 +243,7 @@ func (d *sqlDelivery) Reject(ctx context.Context, reason string) error { if d.dlqConfig.Enabled { // Move to DLQ if err := d.subscriber.messageStore.MoveToDLQ( - ctx, d.topic, d.messageID, d.attempt, reason, d.dlqConfig.TopicSuffix, + ctx, d.topic, d.partitionKey, d.messageID, d.attempt, reason, d.dlqConfig.TopicSuffix, ); err != nil { return fmt.Errorf("failed to move message to DLQ: %w", err) } @@ -260,21 +253,21 @@ func (d *sqlDelivery) Reject(ctx context.Context, reason string) error { ctx, d.topic, d.partitionKey, d.offset, d.consumerGroup, ); err != nil { // Log but don't fail — message is already in DLQ - d.subscriber.logger.Errorw("failed to update offset after DLQ move", + d.subscriber.logger.Warnw("failed to update offset after DLQ move", "topic", d.topic, "message_id", d.messageID, "error", err, ) - d.subscriber.metrics.Tagged(map[string]string{ - "topic": d.topic, - "partition_key": d.partitionKey, - }).Counter("offset_update_after_dlq.errors").Inc(1) + metrics.NamedCounter(d.subscriber.scope, "reject", "offset_update_after_dlq_errors", 1, + metrics.Tag{Key: "topic", Value: d.topic}, + metrics.Tag{Key: "partition_key", Value: d.partitionKey}, + ) } - d.subscriber.metrics.Tagged(map[string]string{ - "topic": d.topic, - "partition_key": d.partitionKey, - }).Counter("messages_rejected_to_dlq").Inc(1) + metrics.NamedCounter(d.subscriber.scope, "reject", "messages_rejected_to_dlq", 1, + metrics.Tag{Key: "topic", Value: d.topic}, + metrics.Tag{Key: "partition_key", Value: d.partitionKey}, + ) } else { // DLQ disabled — fall back to ack (remove from queue) if err := d.subscriber.offsetStore.AckMessage( @@ -283,10 +276,10 @@ func (d *sqlDelivery) Reject(ctx context.Context, reason string) error { return err } - d.subscriber.metrics.Tagged(map[string]string{ - "topic": d.topic, - "partition_key": d.partitionKey, - }).Counter("messages_rejected_no_dlq").Inc(1) + metrics.NamedCounter(d.subscriber.scope, "reject", "messages_rejected_no_dlq", 1, + metrics.Tag{Key: "topic", Value: d.topic}, + metrics.Tag{Key: "partition_key", Value: d.partitionKey}, + ) } d.acknowledged = true @@ -299,28 +292,29 @@ func (d *sqlDelivery) ExtendVisibilityTimeout(ctx context.Context, durationMilli defer d.mu.Unlock() if d.acknowledged { - return fmt.Errorf("delivery %s already acknowledged, cannot extend visibility timeout", d.deliveryID) + return &ErrAlreadyAcknowledged{DeliveryID: d.deliveryID} } - if err := d.subscriber.messageStore.SetVisibilityTimeout(ctx, d.topic, d.messageID, durationMillis); err != nil { + if err := d.subscriber.messageStore.SetVisibilityTimeout(ctx, d.topic, d.partitionKey, d.messageID, durationMillis); err != nil { return err } - // Record metrics - d.subscriber.metrics.Tagged(map[string]string{ - "topic": d.topic, - "partition_key": d.partitionKey, - }).Counter("visibility_extended").Inc(1) + metrics.NamedCounter(d.subscriber.scope, "extend_visibility", "visibility_extended", 1, + metrics.Tag{Key: "topic", Value: d.topic}, + metrics.Tag{Key: "partition_key", Value: d.partitionKey}, + ) return nil } -func NewSubscriber(logger *zap.SugaredLogger, metrics tally.Scope, messageStore messageStore, offsetStore offsetStore, leaseStore partitionLeaseStore, heartbeatStore subscriberHeartbeatStore) *subscriber { +func NewSubscriber(logger *zap.SugaredLogger, scope tally.Scope, messageStore messageStore, offsetStore offsetStore, leaseStore partitionLeaseStore, heartbeatStore subscriberHeartbeatStore) *subscriber { + logger = logger.Named("queue_mysql_subscriber") + scope = scope.SubScope("queue_mysql_subscriber") logger.Info("created subscriber") return &subscriber{ logger: logger, - metrics: metrics, + scope: scope, messageStore: messageStore, offsetStore: offsetStore, leaseStore: leaseStore, @@ -330,14 +324,16 @@ func NewSubscriber(logger *zap.SugaredLogger, metrics tally.Scope, messageStore } // Subscribe starts consuming messages from the specified topic -func (s *subscriber) Subscribe(ctx context.Context, topic string, config extqueue.SubscriptionConfig) (<-chan extqueue.Delivery, error) { +func (s *subscriber) Subscribe(ctx context.Context, topic string, config extqueue.SubscriptionConfig) (_ <-chan extqueue.Delivery, retErr error) { + op := metrics.Begin(s.scope, "subscribe") + defer func() { op.Complete(retErr) }() + s.mu.RLock() closed := s.closed s.mu.RUnlock() if closed { - s.logger.Errorw("subscribe failed: subscriber is closed", "topic", topic) - return nil, fmt.Errorf("subscriber is closed") + return nil, ErrSubscriberClosed } // Create subscription key (topic + consumer group must be unique) @@ -374,8 +370,9 @@ func (s *subscriber) Subscribe(ctx context.Context, topic string, config extqueu s.subscriptions[subKey] = sub - // Track active subscription - s.metrics.Tagged(map[string]string{"topic": topic}).Gauge("active_subscriptions").Update(1) + metrics.NamedGauge(s.scope, "subscribe", "active_subscriptions", 1, + metrics.Tag{Key: "topic", Value: topic}, + ) // Start the supervisor goroutine. It will discover partitions, acquire // leases, and spawn per-partition worker goroutines. The supervisor runs @@ -383,7 +380,6 @@ func (s *subscriber) Subscribe(ctx context.Context, topic string, config extqueu sub.wg.Add(1) go s.managePartitions(subCtx, sub) - s.logger.Debugw("subscription created", "topic", topic, "consumer_group", config.ConsumerGroup, "subscriber_name", config.SubscriberName) return sub.deliveryCh, nil } @@ -458,13 +454,15 @@ func (s *subscriber) discoverAndReconcileWorkers(ctx context.Context, sub *subsc // Discover and try to acquire leases for new partitions acquiredCount, err := s.leaseStore.DiscoverAndAcquirePartitions(ctx, sub.topic, cfg.SubscriberName, cfg.ConsumerGroup, cfg.LeaseDurationMs, maxPartitions) if err == nil && acquiredCount > 0 { - s.metrics.Tagged(map[string]string{"topic": sub.topic}).Counter("leases_acquired").Inc(int64(acquiredCount)) + metrics.NamedCounter(s.scope, "discover_and_reconcile", "leases_acquired", int64(acquiredCount), + metrics.Tag{Key: "topic", Value: sub.topic}, + ) } // Get currently leased partitions leasedPartitions, err := s.leaseStore.GetLeasedPartitions(ctx, sub.topic, cfg.SubscriberName, cfg.ConsumerGroup) if err != nil { - s.logger.Errorw("failed to get leased partitions", "topic", sub.topic, "error", err) + s.logger.Warnw("failed to get leased partitions", "topic", sub.topic, "error", err) return } @@ -635,7 +633,7 @@ func (w *partitionWorker) pollAndDeliver(ctx context.Context) { // Initialize offset for this partition once per worker lifetime if !w.offsetInitialized { if err := s.offsetStore.Initialize(ctx, sub.topic, partitionKey, cfg.ConsumerGroup); err != nil { - s.logger.Errorw("offset initialization failure", "topic", sub.topic, "partition_key", partitionKey, "error", err) + s.logger.Warnw("offset initialization failure", "topic", sub.topic, "partition_key", partitionKey, "error", err) return } w.offsetInitialized = true @@ -644,14 +642,14 @@ func (w *partitionWorker) pollAndDeliver(ctx context.Context) { // Get current offset for this partition currentOffset, err := s.offsetStore.GetAckedOffset(ctx, sub.topic, partitionKey, cfg.ConsumerGroup) if err != nil { - s.logger.Errorw("get current offset failure", "topic", sub.topic, "partition_key", partitionKey, "error", err) + s.logger.Warnw("get current offset failure", "topic", sub.topic, "partition_key", partitionKey, "error", err) return } // Fetch messages for this partition rows, err := s.messageStore.FetchByOffset(ctx, sub.topic, partitionKey, currentOffset, cfg.BatchSize, cfg.VisibilityTimeoutMs) if err != nil { - s.logger.Errorw("fetch messages failure", "topic", sub.topic, "partition_key", partitionKey, "error", err) + s.logger.Warnw("fetch messages failure", "topic", sub.topic, "partition_key", partitionKey, "error", err) return } @@ -669,17 +667,17 @@ func (w *partitionWorker) pollAndDeliver(ctx context.Context) { // Move to DLQ if enabled if cfg.DLQ.Enabled { dlqTopic := sub.topic + cfg.DLQ.TopicSuffix - if err := s.messageStore.MoveToDLQ(ctx, sub.topic, row.ID, row.RetryCount, "exceeded retry limit", cfg.DLQ.TopicSuffix); err != nil { - s.logger.Errorw("failed to move message to DLQ", + if err := s.messageStore.MoveToDLQ(ctx, sub.topic, partitionKey, row.ID, row.RetryCount, "exceeded retry limit", cfg.DLQ.TopicSuffix); err != nil { + s.logger.Warnw("failed to move message to DLQ", "topic", sub.topic, "dlq_topic", dlqTopic, "message_id", row.ID, "error", err, ) - s.metrics.Tagged(map[string]string{ - "topic": sub.topic, - "partition_key": partitionKey, - }).Counter("dlq_move.errors").Inc(1) + metrics.NamedCounter(s.scope, "poll_and_deliver", "dlq_move_errors", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + metrics.Tag{Key: "partition_key", Value: partitionKey}, + ) } else { s.logger.Infow("moved message to DLQ", "topic", sub.topic, @@ -687,23 +685,23 @@ func (w *partitionWorker) pollAndDeliver(ctx context.Context) { "message_id", row.ID, "retry_count", row.RetryCount, ) - s.metrics.Tagged(map[string]string{ - "topic": sub.topic, - "partition_key": partitionKey, - }).Counter("messages_moved_to_dlq").Inc(1) + metrics.NamedCounter(s.scope, "poll_and_deliver", "messages_moved_to_dlq", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + metrics.Tag{Key: "partition_key", Value: partitionKey}, + ) // Update offset since message is now processed (moved to DLQ) if err := s.offsetStore.UpdateAckedOffset(ctx, sub.topic, partitionKey, row.Offset, cfg.ConsumerGroup); err != nil { - s.logger.Errorw("failed to update offset after DLQ move", + s.logger.Warnw("failed to update offset after DLQ move", "topic", sub.topic, "partition_key", partitionKey, "offset", row.Offset, "error", err, ) - s.metrics.Tagged(map[string]string{ - "topic": sub.topic, - "partition_key": partitionKey, - }).Counter("offset_update_after_dlq.errors").Inc(1) + metrics.NamedCounter(s.scope, "poll_and_deliver", "offset_update_after_dlq_errors", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + metrics.Tag{Key: "partition_key", Value: partitionKey}, + ) } } } @@ -714,12 +712,12 @@ func (w *partitionWorker) pollAndDeliver(ctx context.Context) { msg := queue.NewMessage(row.ID, row.Payload, row.PartitionKey, row.Metadata) msg.PublishedAt = row.PublishedAt - // Calculate message age for metrics + // Calculate message age for scope messageAge := time.Duration(time.Now().UnixMilli()-row.PublishedAt) * time.Millisecond - s.metrics.Tagged(map[string]string{ - "topic": sub.topic, - "partition_key": partitionKey, - }).Timer("message_age").Record(messageAge) + metrics.NamedTimer(s.scope, "poll_and_deliver", "message_age", messageAge, + metrics.Tag{Key: "topic", Value: sub.topic}, + metrics.Tag{Key: "partition_key", Value: partitionKey}, + ) // Create delivery ID from offset deliveryID := strconv.FormatInt(row.Offset, 10) @@ -769,15 +767,15 @@ func (w *partitionWorker) pollAndDeliver(ctx context.Context) { } } - // Record metrics + // Record scope if messageCount > 0 { elapsed := time.Since(start) - partitionTags := map[string]string{ - "topic": sub.topic, - "partition_key": partitionKey, + tags := []metrics.Tag{ + {Key: "topic", Value: sub.topic}, + {Key: "partition_key", Value: partitionKey}, } - s.metrics.Tagged(partitionTags).Counter("messages_received").Inc(int64(messageCount)) - s.metrics.Tagged(partitionTags).Timer("poll_latency").Record(elapsed) + metrics.NamedCounter(s.scope, "poll_and_deliver", "messages_received", int64(messageCount), tags...) + metrics.NamedTimer(s.scope, "poll_and_deliver", "poll_latency", elapsed, tags...) s.logger.Debugw("delivered messages", "topic", sub.topic, @@ -793,11 +791,13 @@ func (s *subscriber) renewLeases(ctx context.Context, sub *subscription) { cfg := sub.config leasedPartitions, err := s.leaseStore.GetLeasedPartitions(ctx, sub.topic, cfg.SubscriberName, cfg.ConsumerGroup) if err != nil { - s.logger.Errorw("failed to get leased partitions for renewal", + s.logger.Warnw("failed to get leased partitions for renewal", "topic", sub.topic, "error", err, ) - s.metrics.Tagged(map[string]string{"topic": sub.topic}).Counter("lease_renewal.get_partitions_errors").Inc(1) + metrics.NamedCounter(s.scope, "renew_leases", "get_partitions_errors", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + ) return } @@ -808,10 +808,10 @@ func (s *subscriber) renewLeases(ctx context.Context, sub *subscription) { "partition_key", partitionKey, "error", err, ) - s.metrics.Tagged(map[string]string{ - "topic": sub.topic, - "partition_key": partitionKey, - }).Counter("lease_renewal.renew_errors").Inc(1) + metrics.NamedCounter(s.scope, "renew_leases", "renew_errors", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + metrics.Tag{Key: "partition_key", Value: partitionKey}, + ) } } } @@ -821,7 +821,7 @@ func (s *subscriber) releaseAllLeases(ctx context.Context, sub *subscription) { cfg := sub.config leasedPartitions, err := s.leaseStore.GetLeasedPartitions(ctx, sub.topic, cfg.SubscriberName, cfg.ConsumerGroup) if err != nil { - s.logger.Errorw("failed to get leased partitions for release", + s.logger.Warnw("failed to get leased partitions for release", "topic", sub.topic, "error", err, ) @@ -835,10 +835,10 @@ func (s *subscriber) releaseAllLeases(ctx context.Context, sub *subscription) { "partition_key", partitionKey, "error", err, ) - s.metrics.Tagged(map[string]string{ - "topic": sub.topic, - "partition_key": partitionKey, - }).Counter("lease_release.errors").Inc(1) + metrics.NamedCounter(s.scope, "release_all_leases", "release_errors", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + metrics.Tag{Key: "partition_key", Value: partitionKey}, + ) } } } @@ -854,10 +854,14 @@ func (s *subscriber) sendHeartbeat(ctx context.Context, sub *subscription) { "topic", sub.topic, "error", err, ) - s.metrics.Tagged(map[string]string{"topic": sub.topic}).Counter("heartbeat.errors").Inc(1) + metrics.NamedCounter(s.scope, "heartbeat", "errors", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + ) return } - s.metrics.Tagged(map[string]string{"topic": sub.topic}).Counter("heartbeats_sent").Inc(1) + metrics.NamedCounter(s.scope, "heartbeat", "sent", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + ) } // deregisterHeartbeat removes this subscriber's heartbeat entry during shutdown. @@ -869,10 +873,14 @@ func (s *subscriber) deregisterHeartbeat(ctx context.Context, sub *subscription) "topic", sub.topic, "error", err, ) - s.metrics.Tagged(map[string]string{"topic": sub.topic}).Counter("heartbeat.deregister_errors").Inc(1) + metrics.NamedCounter(s.scope, "heartbeat", "deregister_errors", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + ) return } - s.metrics.Tagged(map[string]string{"topic": sub.topic}).Counter("heartbeat.deregistrations").Inc(1) + metrics.NamedCounter(s.scope, "heartbeat", "deregistrations", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + ) } // computeMaxPartitions returns the maximum number of partitions this subscriber @@ -890,7 +898,9 @@ func (s *subscriber) computeMaxPartitions(ctx context.Context, sub *subscription "topic", sub.topic, "error", err, ) - s.metrics.Tagged(map[string]string{"topic": sub.topic}).Counter("fair_share_cap.errors").Inc(1) + metrics.NamedCounter(s.scope, "fair_share_cap", "errors", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + ) return 0 } return maxPart @@ -907,7 +917,9 @@ func (s *subscriber) rebalance(ctx context.Context, sub *subscription) { "topic", sub.topic, "error", err, ) - s.metrics.Tagged(map[string]string{"topic": sub.topic}).Counter("fair_share_cap.errors").Inc(1) + metrics.NamedCounter(s.scope, "fair_share_cap", "errors", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + ) return } if maxPart == 0 || len(owned) <= maxPart { @@ -925,10 +937,10 @@ func (s *subscriber) rebalance(ctx context.Context, sub *subscription) { "partition_key", pk, "error", err, ) - s.metrics.Tagged(map[string]string{ - "topic": sub.topic, - "partition_key": pk, - }).Counter("rebalance.release_errors").Inc(1) + metrics.NamedCounter(s.scope, "rebalance", "release_errors", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + metrics.Tag{Key: "partition_key", Value: pk}, + ) continue } @@ -941,9 +953,9 @@ func (s *subscriber) rebalance(ctx context.Context, sub *subscription) { "owned", len(owned), "max_partitions", maxPart, ) - s.metrics.Tagged(map[string]string{ - "topic": sub.topic, - }).Counter("rebalance.partitions_released").Inc(1) + metrics.NamedCounter(s.scope, "rebalance", "partitions_released", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + ) } } @@ -961,7 +973,9 @@ func (s *subscriber) fairShareCap(ctx context.Context, sub *subscription) (int, } activeSubscribers := len(active) - s.metrics.Tagged(map[string]string{"topic": sub.topic}).Gauge("active_subscribers").Update(float64(activeSubscribers)) + metrics.NamedGauge(s.scope, "fair_share_cap", "active_subscribers", float64(activeSubscribers), + metrics.Tag{Key: "topic", Value: sub.topic}, + ) owned, err := s.leaseStore.GetLeasedPartitions(ctx, sub.topic, cfg.SubscriberName, cfg.ConsumerGroup) if err != nil { @@ -981,7 +995,9 @@ func (s *subscriber) fairShareCap(ctx context.Context, sub *subscription) (int, "topic", sub.topic, "error", err, ) - s.metrics.Tagged(map[string]string{"topic": sub.topic}).Counter("discover_partitions.errors").Inc(1) + metrics.NamedCounter(s.scope, "fair_share_cap", "discover_partitions_errors", 1, + metrics.Tag{Key: "topic", Value: sub.topic}, + ) } else { for _, pk := range allPartitions { partitionSet[pk] = struct{}{} @@ -1006,7 +1022,10 @@ func (s *subscriber) fairShareCap(ctx context.Context, sub *subscription) (int, // Close() does not block indefinitely if a subscription hangs // 3. managePartitions internally handles stopping workers and closing deliveryCh // (see managePartitions shutdown sequence) -func (s *subscriber) Close() error { +func (s *subscriber) Close() (retErr error) { + op := metrics.Begin(s.scope, "close") + defer func() { op.Complete(retErr) }() + s.mu.Lock() defer s.mu.Unlock() @@ -1041,8 +1060,9 @@ func (s *subscriber) Close() error { s.logger.Warnw("subscription shutdown timeout", "topic", topic) } - // Update metrics - s.metrics.Tagged(map[string]string{"topic": topic}).Gauge("active_subscriptions").Update(0) + metrics.NamedGauge(s.scope, "close", "active_subscriptions", 0, + metrics.Tag{Key: "topic", Value: topic}, + ) } s.subscriptions = make(map[string]*subscription) diff --git a/extension/queue/mysql/subscriber_heartbeat_store.go b/extension/queue/mysql/subscriber_heartbeat_store.go index 68316a25..0e17d782 100644 --- a/extension/queue/mysql/subscriber_heartbeat_store.go +++ b/extension/queue/mysql/subscriber_heartbeat_store.go @@ -21,7 +21,7 @@ import ( "time" "github.com/uber-go/tally/v4" - coremetrics "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/core/metrics" "go.uber.org/zap" ) @@ -29,21 +29,21 @@ import ( type sqlSubscriberHeartbeatStore struct { db *sql.DB logger *zap.SugaredLogger - metrics tally.Scope + scope tally.Scope } // newSubscriberHeartbeatStore creates a new SQL subscriber heartbeat store func newSubscriberHeartbeatStore(db *sql.DB, logger *zap.Logger, scope tally.Scope) subscriberHeartbeatStore { return &sqlSubscriberHeartbeatStore{ db: db, - logger: logger.Sugar().Named("subscriber_heartbeat_store"), - metrics: scope.SubScope("subscriber_heartbeat_store"), + logger: logger.Sugar().Named("queue_mysql_subscriber_heartbeat_store"), + scope: scope.SubScope("queue_mysql_subscriber_heartbeat_store"), } } // Heartbeat registers or renews a subscriber's heartbeat func (s *sqlSubscriberHeartbeatStore) Heartbeat(ctx context.Context, topic string, subscriberName string, consumerGroup string) (retErr error) { - op := coremetrics.Begin(s.metrics, "heartbeat") + op := metrics.Begin(s.scope, "heartbeat") defer func() { op.Complete(retErr) }() now := time.Now().UnixMilli() @@ -63,7 +63,7 @@ func (s *sqlSubscriberHeartbeatStore) Heartbeat(ctx context.Context, topic strin // ActiveSubscribers returns the names of subscribers with a heartbeat newer than the stale threshold. func (s *sqlSubscriberHeartbeatStore) ActiveSubscribers(ctx context.Context, topic string, consumerGroup string, staleDurationMs int64) (_ []string, retErr error) { - op := coremetrics.Begin(s.metrics, "active_subscribers") + op := metrics.Begin(s.scope, "active_subscribers") defer func() { op.Complete(retErr) }() staleThreshold := time.Now().UnixMilli() - staleDurationMs @@ -102,7 +102,7 @@ func (s *sqlSubscriberHeartbeatStore) ActiveSubscribers(ctx context.Context, top // Deregister soft-deletes a subscriber's heartbeat entry by setting deregistered_at. // Idempotent: no-op if already deregistered. func (s *sqlSubscriberHeartbeatStore) Deregister(ctx context.Context, topic string, subscriberName string, consumerGroup string) (retErr error) { - op := coremetrics.Begin(s.metrics, "deregister") + op := metrics.Begin(s.scope, "deregister") defer func() { op.Complete(retErr) }() now := time.Now().UnixMilli() diff --git a/extension/queue/mysql/subscriber_test.go b/extension/queue/mysql/subscriber_test.go index 055acd0f..c8ffedd6 100644 --- a/extension/queue/mysql/subscriber_test.go +++ b/extension/queue/mysql/subscriber_test.go @@ -179,7 +179,7 @@ func TestSQLDelivery_Reject(t *testing.T) { if tt.expectMoveDLQ { mockMsgStore.EXPECT().MoveToDLQ( - gomock.Any(), "test_topic", "msg-1", 1, "bad payload", "_dlq", + gomock.Any(), "test_topic", "part-1", "msg-1", 1, "bad payload", "_dlq", ).Return(tt.moveToDLQErr) if tt.moveToDLQErr == nil {