|
| 1 | +// Copyright (c) 2025 Uber Technologies, Inc. |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +package mysql |
| 16 | + |
| 17 | +import ( |
| 18 | + "context" |
| 19 | + "database/sql" |
| 20 | + "fmt" |
| 21 | + "strings" |
| 22 | + |
| 23 | + "github.com/uber-go/tally/v4" |
| 24 | + |
| 25 | + "github.com/uber/submitqueue/core/metrics" |
| 26 | + "github.com/uber/submitqueue/entity" |
| 27 | + "github.com/uber/submitqueue/extension/changestore" |
| 28 | +) |
| 29 | + |
| 30 | +type changeStore struct { |
| 31 | + db *sql.DB |
| 32 | + scope tally.Scope |
| 33 | +} |
| 34 | + |
| 35 | +// NewChangeStore creates a new MySQL-backed ChangeStore. |
| 36 | +func NewChangeStore(db *sql.DB, scope tally.Scope) changestore.ChangeStore { |
| 37 | + return &changeStore{db: db, scope: scope} |
| 38 | +} |
| 39 | + |
| 40 | +// Create inserts a batch of ChangeRecords. Primary-key conflicts on (uri, request_id) |
| 41 | +// are silently ignored via INSERT IGNORE so queue-redelivery of the same request is a no-op. |
| 42 | +func (s *changeStore) Create(ctx context.Context, records []entity.ChangeRecord) (retErr error) { |
| 43 | + op := metrics.Begin(s.scope, "create") |
| 44 | + defer func() { op.Complete(retErr) }() |
| 45 | + |
| 46 | + if len(records) == 0 { |
| 47 | + return nil |
| 48 | + } |
| 49 | + |
| 50 | + const cols = 6 |
| 51 | + placeholders := strings.Repeat("(?, ?, ?, ?, ?, ?), ", len(records)) |
| 52 | + placeholders = placeholders[:len(placeholders)-2] // trim trailing ", " |
| 53 | + |
| 54 | + args := make([]any, 0, len(records)*cols) |
| 55 | + for _, r := range records { |
| 56 | + // Pass empty Metadata as NULL — JSON column rejects empty string but accepts NULL. |
| 57 | + var metadata any |
| 58 | + if r.Metadata != "" { |
| 59 | + metadata = r.Metadata |
| 60 | + } |
| 61 | + args = append(args, r.URI, r.RequestID, r.Queue, metadata, r.CreatedAt, r.UpdatedAt) |
| 62 | + } |
| 63 | + |
| 64 | + query := "INSERT IGNORE INTO `change` (uri, request_id, queue, metadata, created_at, updated_at) VALUES " + placeholders |
| 65 | + if _, err := s.db.ExecContext(ctx, query, args...); err != nil { |
| 66 | + return fmt.Errorf("failed to insert change records (count=%d): %w", len(records), err) |
| 67 | + } |
| 68 | + return nil |
| 69 | +} |
| 70 | + |
| 71 | +// FindOverlapping returns ChangeRecords whose uri is in the given set, scoped to queue, |
| 72 | +// excluding any belonging to excludeRequestID. |
| 73 | +func (s *changeStore) FindOverlapping( |
| 74 | + ctx context.Context, |
| 75 | + queue string, |
| 76 | + uris []string, |
| 77 | + excludeRequestID string, |
| 78 | +) (ret []entity.ChangeRecord, retErr error) { |
| 79 | + op := metrics.Begin(s.scope, "find_overlapping") |
| 80 | + defer func() { op.Complete(retErr) }() |
| 81 | + |
| 82 | + if len(uris) == 0 { |
| 83 | + return nil, nil |
| 84 | + } |
| 85 | + |
| 86 | + uriPlaceholders := "?" + strings.Repeat(", ?", len(uris)-1) |
| 87 | + query := "SELECT uri, request_id, queue, metadata, created_at, updated_at FROM `change` " + |
| 88 | + "WHERE uri IN (" + uriPlaceholders + ") AND queue = ? AND request_id != ?" |
| 89 | + |
| 90 | + args := make([]any, 0, len(uris)+2) |
| 91 | + for _, u := range uris { |
| 92 | + args = append(args, u) |
| 93 | + } |
| 94 | + args = append(args, queue, excludeRequestID) |
| 95 | + |
| 96 | + rows, err := s.db.QueryContext(ctx, query, args...) |
| 97 | + if err != nil { |
| 98 | + return nil, fmt.Errorf("failed to query overlapping changes for queue=%s: %w", queue, err) |
| 99 | + } |
| 100 | + defer rows.Close() |
| 101 | + |
| 102 | + var results []entity.ChangeRecord |
| 103 | + for rows.Next() { |
| 104 | + var rec entity.ChangeRecord |
| 105 | + var metadata sql.NullString |
| 106 | + if err := rows.Scan(&rec.URI, &rec.RequestID, &rec.Queue, &metadata, &rec.CreatedAt, &rec.UpdatedAt); err != nil { |
| 107 | + return nil, fmt.Errorf("failed to scan change record for queue=%s: %w", queue, err) |
| 108 | + } |
| 109 | + if metadata.Valid { |
| 110 | + rec.Metadata = metadata.String |
| 111 | + } |
| 112 | + results = append(results, rec) |
| 113 | + } |
| 114 | + if err := rows.Err(); err != nil { |
| 115 | + return nil, fmt.Errorf("failed to iterate change records for queue=%s: %w", queue, err) |
| 116 | + } |
| 117 | + return results, nil |
| 118 | +} |
0 commit comments