diff --git a/extension/queue/mysql/message_store.go b/extension/queue/mysql/message_store.go index e0815451..8afa0d31 100644 --- a/extension/queue/mysql/message_store.go +++ b/extension/queue/mysql/message_store.go @@ -44,7 +44,15 @@ func newMessageStore(db *sql.DB, logger *zap.SugaredLogger, scope tally.Scope) m } } -// Insert inserts messages into the messages table +// Insert inserts messages into the messages table. +// +// Publishes are idempotent on the (topic, partition_key, id) unique key: a +// repeated publish for the same key is silently treated as success and does +// not overwrite the original payload. This matches the queue_messages schema's +// documented intent ("Supports: INSERT ... ON DUPLICATE KEY to enforce +// idempotent publishes") and lets callers safely retry publishes (e.g. a +// second Cancel RPC for the same request) without surfacing 1062 duplicate-key +// errors. func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []queue.Message) (retErr error) { op := metrics.Begin(s.scope, "insert", metrics.NewTag("topic", topic)) defer func() { op.Complete(retErr) }() @@ -64,9 +72,12 @@ func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []q } defer tx.Rollback() + // ON DUPLICATE KEY UPDATE topic=topic is a no-op write that makes MySQL + // swallow the unique-key violation without mutating the existing row. stmt, err := tx.PrepareContext(ctx, fmt.Sprintf(` INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, failed_at, failure_count, last_error, original_topic) VALUES (?, ?, ?, ?, ?, ?, ?, 0, 0, '', '') + ON DUPLICATE KEY UPDATE topic = topic `, MessagesTableName)) if err != nil { return fmt.Errorf("prepare statement topic=%s: %w", topic, err) diff --git a/extension/queue/mysql/message_store_test.go b/extension/queue/mysql/message_store_test.go index 1b9bdd18..85e401f5 100644 --- a/extension/queue/mysql/message_store_test.go +++ b/extension/queue/mysql/message_store_test.go @@ -76,6 +76,23 @@ func TestMessageStore_Insert(t *testing.T) { setup: func(mock sqlmock.Sqlmock, messages []queue.Message) {}, wantErr: false, }, + { + // Regression: re-publishing the same (topic, partition_key, id) tuple + // must succeed silently. sqlmock returns 0 affected rows to simulate + // MySQL's ON DUPLICATE KEY UPDATE swallowing the unique-key collision. + name: "duplicate publish is idempotent", + messages: []queue.Message{ + {ID: "msg-dup", Payload: []byte("payload"), PartitionKey: "part1", PublishedAt: time.Now().UnixMilli()}, + }, + setup: func(mock sqlmock.Sqlmock, messages []queue.Message) { + mock.ExpectBegin() + mock.ExpectPrepare("INSERT INTO queue_messages") + mock.ExpectExec("INSERT INTO queue_messages"). + WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectCommit() + }, + wantErr: false, + }, } for _, tt := range tests { diff --git a/test/integration/extension/queue/mysql/queue_test.go b/test/integration/extension/queue/mysql/queue_test.go index c669cdee..728711ef 100644 --- a/test/integration/extension/queue/mysql/queue_test.go +++ b/test/integration/extension/queue/mysql/queue_test.go @@ -595,10 +595,13 @@ func (s *SQLQueueIntegrationSuite) TestIdempotentPublish() { require.NoError(t, err1) err2 := publisher.Publish(s.ctx, topic, msg) - // Second publish should fail with duplicate key error since message already exists - require.Error(t, err2, "duplicate publish should return error") + // Per the queue_messages schema (and the documented "INSERT ... ON DUPLICATE KEY + // to enforce idempotent publishes" contract), a repeated publish for the same + // (topic, partition_key, id) tuple must succeed silently — the queue, not the + // caller, owns deduplication so retried publishes don't surface as errors. + require.NoError(t, err2, "duplicate publish must succeed silently") - t.Logf("Published same message twice - second attempt correctly rejected") + t.Logf("Published same message twice - second attempt silently deduped by queue") // Should only receive once delivery := receiveWithTimeout(t, deliveryChan)