Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion extension/queue/mysql/message_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ func newMessageStore(db *sql.DB, logger *zap.SugaredLogger, scope tally.Scope) m
}
}

// Insert inserts messages into the messages table
// Insert inserts messages into the messages table.
//
// Publishes are idempotent on the (topic, partition_key, id) unique key: a
// repeated publish for the same key is silently treated as success and does
// not overwrite the original payload. This matches the queue_messages schema's
// documented intent ("Supports: INSERT ... ON DUPLICATE KEY to enforce
// idempotent publishes") and lets callers safely retry publishes (e.g. a
// second Cancel RPC for the same request) without surfacing 1062 duplicate-key
// errors.
func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []queue.Message) (retErr error) {
op := metrics.Begin(s.scope, "insert", metrics.NewTag("topic", topic))
defer func() { op.Complete(retErr) }()
Expand All @@ -64,9 +72,12 @@ func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []q
}
defer tx.Rollback()

// ON DUPLICATE KEY UPDATE topic=topic is a no-op write that makes MySQL
// swallow the unique-key violation without mutating the existing row.
stmt, err := tx.PrepareContext(ctx, fmt.Sprintf(`
INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, failed_at, failure_count, last_error, original_topic)
VALUES (?, ?, ?, ?, ?, ?, ?, 0, 0, '', '')
ON DUPLICATE KEY UPDATE topic = topic
`, MessagesTableName))
if err != nil {
return fmt.Errorf("prepare statement topic=%s: %w", topic, err)
Expand Down
17 changes: 17 additions & 0 deletions extension/queue/mysql/message_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,23 @@ func TestMessageStore_Insert(t *testing.T) {
setup: func(mock sqlmock.Sqlmock, messages []queue.Message) {},
wantErr: false,
},
{
// Regression: re-publishing the same (topic, partition_key, id) tuple
// must succeed silently. sqlmock returns 0 affected rows to simulate
// MySQL's ON DUPLICATE KEY UPDATE swallowing the unique-key collision.
name: "duplicate publish is idempotent",
messages: []queue.Message{
{ID: "msg-dup", Payload: []byte("payload"), PartitionKey: "part1", PublishedAt: time.Now().UnixMilli()},
},
setup: func(mock sqlmock.Sqlmock, messages []queue.Message) {
mock.ExpectBegin()
mock.ExpectPrepare("INSERT INTO queue_messages")
mock.ExpectExec("INSERT INTO queue_messages").
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectCommit()
},
wantErr: false,
},
}

for _, tt := range tests {
Expand Down
9 changes: 6 additions & 3 deletions test/integration/extension/queue/mysql/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,10 +595,13 @@ func (s *SQLQueueIntegrationSuite) TestIdempotentPublish() {
require.NoError(t, err1)

err2 := publisher.Publish(s.ctx, topic, msg)
// Second publish should fail with duplicate key error since message already exists
require.Error(t, err2, "duplicate publish should return error")
// Per the queue_messages schema (and the documented "INSERT ... ON DUPLICATE KEY
// to enforce idempotent publishes" contract), a repeated publish for the same
// (topic, partition_key, id) tuple must succeed silently — the queue, not the
// caller, owns deduplication so retried publishes don't surface as errors.
require.NoError(t, err2, "duplicate publish must succeed silently")

t.Logf("Published same message twice - second attempt correctly rejected")
t.Logf("Published same message twice - second attempt silently deduped by queue")

// Should only receive once
delivery := receiveWithTimeout(t, deliveryChan)
Expand Down
Loading