|
| 1 | +// Copyright (c) 2025 Uber Technologies, Inc. |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +package mysql |
| 16 | + |
| 17 | +import ( |
| 18 | + "context" |
| 19 | + "database/sql" |
| 20 | + "fmt" |
| 21 | + "time" |
| 22 | + |
| 23 | + "github.com/uber-go/tally/v4" |
| 24 | + "github.com/uber/submitqueue/core/metrics" |
| 25 | + "go.uber.org/zap" |
| 26 | +) |
| 27 | + |
| 28 | +// sqldeliveryStateStore is the SQL implementation of deliveryStateStore |
| 29 | +type sqldeliveryStateStore struct { |
| 30 | + db *sql.DB |
| 31 | + logger *zap.SugaredLogger |
| 32 | + scope tally.Scope |
| 33 | +} |
| 34 | + |
| 35 | +// newDeliveryStateStore creates a new SQL delivery state store |
| 36 | +func newDeliveryStateStore(db *sql.DB, logger *zap.SugaredLogger, scope tally.Scope) deliveryStateStore { |
| 37 | + return &sqldeliveryStateStore{ |
| 38 | + db: db, |
| 39 | + logger: logger.Named("delivery_state_store"), |
| 40 | + scope: scope.SubScope("delivery_state_store"), |
| 41 | + } |
| 42 | +} |
| 43 | + |
| 44 | +// MarkDelivered inserts a row marking message as in-flight for this consumer group. |
| 45 | +func (s *sqldeliveryStateStore) MarkDelivered(ctx context.Context, consumerGroup, topic, partitionKey string, offset int64, visibilityTimeoutMs int64) (retErr error) { |
| 46 | + op := metrics.Begin(s.scope, "mark_delivered", |
| 47 | + metrics.NewTag("topic", topic), |
| 48 | + metrics.NewTag("consumer_group", consumerGroup), |
| 49 | + metrics.NewTag("partition_key", partitionKey)) |
| 50 | + defer func() { op.Complete(retErr) }() |
| 51 | + |
| 52 | + now := time.Now().UnixMilli() |
| 53 | + invisibleUntil := now + visibilityTimeoutMs |
| 54 | + |
| 55 | + _, err := s.db.ExecContext(ctx, fmt.Sprintf(` |
| 56 | + INSERT INTO %s (consumer_group, topic, partition_key, message_offset, acked, invisible_until, retry_count) |
| 57 | + VALUES (?, ?, ?, ?, FALSE, ?, 0) |
| 58 | + ON DUPLICATE KEY UPDATE |
| 59 | + invisible_until = IF(acked = FALSE, VALUES(invisible_until), invisible_until), |
| 60 | + retry_count = IF(acked = FALSE, retry_count + 1, retry_count) |
| 61 | + `, DeliveryStateTableName), |
| 62 | + consumerGroup, topic, partitionKey, offset, invisibleUntil) |
| 63 | + |
| 64 | + if err != nil { |
| 65 | + return fmt.Errorf("mark delivered topic=%s partition=%s offset=%d: %w", topic, partitionKey, offset, err) |
| 66 | + } |
| 67 | + |
| 68 | + return nil |
| 69 | +} |
| 70 | + |
| 71 | +// ExtendVisibility extends the visibility timeout for an in-flight message |
| 72 | +// without incrementing retry_count. Used by ExtendVisibilityTimeout. |
| 73 | +func (s *sqldeliveryStateStore) ExtendVisibility(ctx context.Context, consumerGroup, topic, partitionKey string, offset int64, visibilityTimeoutMs int64) (retErr error) { |
| 74 | + op := metrics.Begin(s.scope, "extend_visibility", |
| 75 | + metrics.NewTag("topic", topic), |
| 76 | + metrics.NewTag("consumer_group", consumerGroup), |
| 77 | + metrics.NewTag("partition_key", partitionKey)) |
| 78 | + defer func() { op.Complete(retErr) }() |
| 79 | + |
| 80 | + now := time.Now().UnixMilli() |
| 81 | + invisibleUntil := now + visibilityTimeoutMs |
| 82 | + |
| 83 | + _, err := s.db.ExecContext(ctx, fmt.Sprintf(` |
| 84 | + UPDATE %s |
| 85 | + SET invisible_until = ? |
| 86 | + WHERE consumer_group = ? AND topic = ? AND partition_key = ? AND message_offset = ? AND acked = FALSE |
| 87 | + `, DeliveryStateTableName), |
| 88 | + invisibleUntil, consumerGroup, topic, partitionKey, offset) |
| 89 | + |
| 90 | + if err != nil { |
| 91 | + return fmt.Errorf("extend visibility topic=%s partition=%s offset=%d: %w", topic, partitionKey, offset, err) |
| 92 | + } |
| 93 | + |
| 94 | + return nil |
| 95 | +} |
| 96 | + |
| 97 | +// MarkAcked sets acked = TRUE to indicate this group has processed the message. |
| 98 | +func (s *sqldeliveryStateStore) MarkAcked(ctx context.Context, consumerGroup, topic, partitionKey string, offset int64) (retErr error) { |
| 99 | + op := metrics.Begin(s.scope, "mark_acked", |
| 100 | + metrics.NewTag("topic", topic), |
| 101 | + metrics.NewTag("consumer_group", consumerGroup), |
| 102 | + metrics.NewTag("partition_key", partitionKey)) |
| 103 | + defer func() { op.Complete(retErr) }() |
| 104 | + |
| 105 | + _, err := s.db.ExecContext(ctx, fmt.Sprintf(` |
| 106 | + INSERT INTO %s (consumer_group, topic, partition_key, message_offset, acked, invisible_until, retry_count) |
| 107 | + VALUES (?, ?, ?, ?, TRUE, 0, 0) |
| 108 | + ON DUPLICATE KEY UPDATE acked = TRUE |
| 109 | + `, DeliveryStateTableName), |
| 110 | + consumerGroup, topic, partitionKey, offset) |
| 111 | + |
| 112 | + if err != nil { |
| 113 | + return fmt.Errorf("mark acked topic=%s partition=%s offset=%d: %w", topic, partitionKey, offset, err) |
| 114 | + } |
| 115 | + |
| 116 | + return nil |
| 117 | +} |
| 118 | + |
| 119 | +// MarkNacked sets invisible_until = now + delay to schedule redelivery. |
| 120 | +// retry_count is NOT incremented here — it is incremented by MarkDelivered on redelivery. |
| 121 | +func (s *sqldeliveryStateStore) MarkNacked(ctx context.Context, consumerGroup, topic, partitionKey string, offset int64, delayMs int64) (retErr error) { |
| 122 | + op := metrics.Begin(s.scope, "mark_nacked", |
| 123 | + metrics.NewTag("topic", topic), |
| 124 | + metrics.NewTag("consumer_group", consumerGroup), |
| 125 | + metrics.NewTag("partition_key", partitionKey)) |
| 126 | + defer func() { op.Complete(retErr) }() |
| 127 | + |
| 128 | + now := time.Now().UnixMilli() |
| 129 | + invisibleUntil := now + delayMs |
| 130 | + |
| 131 | + _, err := s.db.ExecContext(ctx, fmt.Sprintf(` |
| 132 | + INSERT INTO %s (consumer_group, topic, partition_key, message_offset, acked, invisible_until, retry_count) |
| 133 | + VALUES (?, ?, ?, ?, FALSE, ?, 0) |
| 134 | + ON DUPLICATE KEY UPDATE |
| 135 | + invisible_until = VALUES(invisible_until) |
| 136 | + `, DeliveryStateTableName), |
| 137 | + consumerGroup, topic, partitionKey, offset, invisibleUntil) |
| 138 | + |
| 139 | + if err != nil { |
| 140 | + return fmt.Errorf("mark nacked topic=%s partition=%s offset=%d: %w", topic, partitionKey, offset, err) |
| 141 | + } |
| 142 | + |
| 143 | + return nil |
| 144 | +} |
| 145 | + |
| 146 | +// GetRetryCount returns the retry count for a specific message and consumer group. |
| 147 | +func (s *sqldeliveryStateStore) GetRetryCount(ctx context.Context, consumerGroup, topic, partitionKey string, offset int64) (_ int, retErr error) { |
| 148 | + op := metrics.Begin(s.scope, "get_retry_count", |
| 149 | + metrics.NewTag("topic", topic), |
| 150 | + metrics.NewTag("consumer_group", consumerGroup), |
| 151 | + metrics.NewTag("partition_key", partitionKey)) |
| 152 | + defer func() { op.Complete(retErr) }() |
| 153 | + |
| 154 | + var retryCount int |
| 155 | + err := s.db.QueryRowContext(ctx, fmt.Sprintf(` |
| 156 | + SELECT retry_count FROM %s |
| 157 | + WHERE consumer_group = ? AND topic = ? AND partition_key = ? AND message_offset = ? |
| 158 | + `, DeliveryStateTableName), consumerGroup, topic, partitionKey, offset).Scan(&retryCount) |
| 159 | + |
| 160 | + if err == sql.ErrNoRows { |
| 161 | + return 0, nil |
| 162 | + } |
| 163 | + if err != nil { |
| 164 | + return 0, fmt.Errorf("get retry count topic=%s partition=%s offset=%d: %w", topic, partitionKey, offset, err) |
| 165 | + } |
| 166 | + |
| 167 | + return retryCount, nil |
| 168 | +} |
| 169 | + |
| 170 | +// IsDeliverable checks if a message offset is deliverable for this consumer group. |
| 171 | +// |
| 172 | +// Deliverability is determined by the acked flag and invisible_until timestamp: |
| 173 | +// - No row (never delivered): deliverable |
| 174 | +// - acked = TRUE: not deliverable (already processed) |
| 175 | +// - acked = FALSE, invisible_until <= now (expired): deliverable (ready for retry) |
| 176 | +// - acked = FALSE, invisible_until > now (pending): not deliverable (in-flight or nack delay) |
| 177 | +func (s *sqldeliveryStateStore) IsDeliverable(ctx context.Context, consumerGroup, topic, partitionKey string, offset int64) (_ bool, retErr error) { |
| 178 | + op := metrics.Begin(s.scope, "is_deliverable", |
| 179 | + metrics.NewTag("topic", topic), |
| 180 | + metrics.NewTag("consumer_group", consumerGroup), |
| 181 | + metrics.NewTag("partition_key", partitionKey)) |
| 182 | + defer func() { op.Complete(retErr) }() |
| 183 | + |
| 184 | + now := time.Now().UnixMilli() |
| 185 | + |
| 186 | + var acked bool |
| 187 | + var invisibleUntil uint64 |
| 188 | + err := s.db.QueryRowContext(ctx, fmt.Sprintf(` |
| 189 | + SELECT acked, invisible_until FROM %s |
| 190 | + WHERE consumer_group = ? AND topic = ? AND partition_key = ? AND message_offset = ? |
| 191 | + `, DeliveryStateTableName), consumerGroup, topic, partitionKey, offset).Scan(&acked, &invisibleUntil) |
| 192 | + |
| 193 | + if err == sql.ErrNoRows { |
| 194 | + // No delivery state row -> never delivered -> deliverable |
| 195 | + return true, nil |
| 196 | + } |
| 197 | + if err != nil { |
| 198 | + return false, fmt.Errorf("check deliverability topic=%s partition=%s offset=%d: %w", topic, partitionKey, offset, err) |
| 199 | + } |
| 200 | + |
| 201 | + // Already processed by this consumer group — never redeliver |
| 202 | + if acked { |
| 203 | + return false, nil |
| 204 | + } |
| 205 | + |
| 206 | + // Message is deliverable if invisible_until has passed |
| 207 | + return invisibleUntil <= uint64(now), nil |
| 208 | +} |
| 209 | + |
| 210 | +// AdvanceWatermark computes the new contiguous acked watermark and cleans up |
| 211 | +// delivery state rows that are behind it. |
| 212 | +// offsets are the actual message offsets above the current watermark (from messageStore). |
| 213 | +// Returns the new watermark (highest contiguous acked offset from currentWatermark). |
| 214 | +func (s *sqldeliveryStateStore) AdvanceWatermark(ctx context.Context, consumerGroup, topic, partitionKey string, currentWatermark int64, offsets []int64) (_ int64, retErr error) { |
| 215 | + op := metrics.Begin(s.scope, "advance_watermark", |
| 216 | + metrics.NewTag("topic", topic), |
| 217 | + metrics.NewTag("consumer_group", consumerGroup), |
| 218 | + metrics.NewTag("partition_key", partitionKey)) |
| 219 | + defer func() { op.Complete(retErr) }() |
| 220 | + |
| 221 | + if len(offsets) == 0 { |
| 222 | + return currentWatermark, nil |
| 223 | + } |
| 224 | + |
| 225 | + // Batch-fetch delivery state for the provided offsets. |
| 226 | + placeholders := make([]byte, 0, len(offsets)*2-1) |
| 227 | + args := make([]interface{}, 0, 3+len(offsets)) |
| 228 | + args = append(args, consumerGroup, topic, partitionKey) |
| 229 | + for i, offset := range offsets { |
| 230 | + if i > 0 { |
| 231 | + placeholders = append(placeholders, ',') |
| 232 | + } |
| 233 | + placeholders = append(placeholders, '?') |
| 234 | + args = append(args, offset) |
| 235 | + } |
| 236 | + |
| 237 | + rows, err := s.db.QueryContext(ctx, fmt.Sprintf(` |
| 238 | + SELECT message_offset, acked FROM %s |
| 239 | + WHERE consumer_group = ? AND topic = ? AND partition_key = ? |
| 240 | + AND message_offset IN (%s) |
| 241 | + `, DeliveryStateTableName, string(placeholders)), args...) |
| 242 | + if err != nil { |
| 243 | + return currentWatermark, fmt.Errorf("query delivery state for watermark topic=%s partition=%s: %w", topic, partitionKey, err) |
| 244 | + } |
| 245 | + defer rows.Close() |
| 246 | + |
| 247 | + // Build lookup map: offset -> acked |
| 248 | + ackedMap := make(map[int64]bool, len(offsets)) |
| 249 | + for rows.Next() { |
| 250 | + var offset int64 |
| 251 | + var acked bool |
| 252 | + if err := rows.Scan(&offset, &acked); err != nil { |
| 253 | + return currentWatermark, fmt.Errorf("scan delivery state topic=%s partition=%s: %w", topic, partitionKey, err) |
| 254 | + } |
| 255 | + ackedMap[offset] = acked |
| 256 | + } |
| 257 | + if err := rows.Err(); err != nil { |
| 258 | + return currentWatermark, fmt.Errorf("delivery state iteration topic=%s partition=%s: %w", topic, partitionKey, err) |
| 259 | + } |
| 260 | + |
| 261 | + // Walk message offsets in order. Advance while contiguous acked. |
| 262 | + // Stop at first offset that is not acked (in-flight, nacked, or undelivered). |
| 263 | + newWatermark := currentWatermark |
| 264 | + for _, offset := range offsets { |
| 265 | + acked, exists := ackedMap[offset] |
| 266 | + if !exists || !acked { |
| 267 | + // No delivery state (undelivered) or not acked — stop |
| 268 | + break |
| 269 | + } |
| 270 | + newWatermark = offset |
| 271 | + } |
| 272 | + |
| 273 | + // Cleanup error is swallowed because the watermark was already computed and |
| 274 | + // will be returned to the caller. The stale delivery state rows behind the |
| 275 | + // watermark are harmless — they are never read again (all queries use |
| 276 | + // offset > watermark). Cleanup is retried on the next AdvanceWatermark call. |
| 277 | + if newWatermark > currentWatermark { |
| 278 | + _, err := s.db.ExecContext(ctx, fmt.Sprintf(` |
| 279 | + DELETE FROM %s |
| 280 | + WHERE consumer_group = ? AND topic = ? AND partition_key = ? AND message_offset <= ? |
| 281 | + `, DeliveryStateTableName), consumerGroup, topic, partitionKey, newWatermark) |
| 282 | + if err != nil { |
| 283 | + metrics.NamedCounter(s.scope, "advance_watermark", "cleanup_errors", 1, |
| 284 | + metrics.NewTag("topic", topic)) |
| 285 | + s.logger.Warnw("failed to clean up delivery state behind watermark", |
| 286 | + logTopic, topic, |
| 287 | + logPartitionKey, partitionKey, |
| 288 | + "watermark", newWatermark, |
| 289 | + logError, err, |
| 290 | + ) |
| 291 | + } |
| 292 | + } |
| 293 | + |
| 294 | + return newWatermark, nil |
| 295 | +} |
0 commit comments