diff --git a/entities/queue/message.go b/entities/queue/message.go index d82a9af1..2992bd64 100644 --- a/entities/queue/message.go +++ b/entities/queue/message.go @@ -27,15 +27,19 @@ type Message struct { PublishedAt int64 } -// NewMessage creates a new message with the given ID and payload. -// Metadata is initialized as an empty map. +// NewMessage creates a new message with the given ID, payload, partition key, and metadata. +// If metadata is nil, it will be initialized as an empty map. // PublishedAt is set to the current time. -func NewMessage(id string, payload []byte) Message { +func NewMessage(id string, payload []byte, partitionKey string, metadata map[string]string) Message { + if metadata == nil { + metadata = make(map[string]string) + } return Message{ - ID: id, - Payload: payload, - Metadata: make(map[string]string), - PublishedAt: time.Now().UnixMilli(), + ID: id, + Payload: payload, + PartitionKey: partitionKey, + Metadata: metadata, + PublishedAt: time.Now().UnixMilli(), } } diff --git a/entities/queue/message_test.go b/entities/queue/message_test.go index d1dcef1a..bc08cd7b 100644 --- a/entities/queue/message_test.go +++ b/entities/queue/message_test.go @@ -11,19 +11,18 @@ func TestNewMessage(t *testing.T) { id := "test-id" payload := []byte("test payload") - msg := NewMessage(id, payload) + msg := NewMessage(id, payload, "", nil) assert.Equal(t, id, msg.ID) assert.Equal(t, payload, msg.Payload) + assert.Empty(t, msg.PartitionKey) assert.NotNil(t, msg.Metadata) assert.Empty(t, msg.Metadata) assert.NotZero(t, msg.PublishedAt) } func TestMessage_Copy(t *testing.T) { - original := NewMessage("id-123", []byte("payload")) - original.Metadata["key"] = "value" - original.PartitionKey = "partition-1" + original := NewMessage("id-123", []byte("payload"), "partition-1", map[string]string{"key": "value"}) copied := original.Copy() @@ -44,7 +43,7 @@ func TestMessage_Copy(t *testing.T) { } func TestMessage_Copy_EmptyPayload(t *testing.T) { - original := NewMessage("id", []byte{}) + original := NewMessage("id", []byte{}, "", nil) copied := original.Copy() assert.NotNil(t, copied.Payload) @@ -53,16 +52,16 @@ func TestMessage_Copy_EmptyPayload(t *testing.T) { } func TestMessage_Fields(t *testing.T) { - msg := NewMessage("id-123", []byte("payload")) + msg := NewMessage("id-123", []byte("payload"), "user-123", map[string]string{ + "trace-id": "xyz", + "source": "gateway", + }) // Test metadata - msg.Metadata["trace-id"] = "xyz" - msg.Metadata["source"] = "gateway" assert.Equal(t, "xyz", msg.Metadata["trace-id"]) assert.Equal(t, "gateway", msg.Metadata["source"]) // Test partition key - msg.PartitionKey = "user-123" assert.Equal(t, "user-123", msg.PartitionKey) // Test PublishedAt can be overridden diff --git a/extensions/queue/sql/config.go b/extensions/queue/sql/config.go index bd79a0e6..aadca89b 100644 --- a/extensions/queue/sql/config.go +++ b/extensions/queue/sql/config.go @@ -60,6 +60,10 @@ type RetryConfig struct { type DLQConfig struct { // Enabled enables dead letter queue Enabled bool + + // TopicSuffix is appended to the original topic name to create the DLQ topic + // For example, if original topic is "orders" and suffix is "_dlq", DLQ topic will be "orders_dlq" + TopicSuffix string } // DefaultConfig returns a Config with sensible defaults @@ -79,7 +83,8 @@ func DefaultConfig(consumerGroup, workerID string) Config { BackoffMultiplier: 2.0, }, DLQ: DLQConfig{ - Enabled: true, + Enabled: true, + TopicSuffix: "_dlq", }, } } @@ -122,5 +127,8 @@ func (c *Config) Validate() error { if c.Retry.BackoffMultiplier < 1.0 { return fmt.Errorf("Retry.BackoffMultiplier must be >= 1.0") } + if c.DLQ.Enabled && c.DLQ.TopicSuffix == "" { + return fmt.Errorf("DLQ.TopicSuffix is required when DLQ is enabled") + } return nil } diff --git a/extensions/queue/sql/config_test.go b/extensions/queue/sql/config_test.go index 49ac11e3..6156e4a3 100644 --- a/extensions/queue/sql/config_test.go +++ b/extensions/queue/sql/config_test.go @@ -19,6 +19,7 @@ func TestDefaultConfig(t *testing.T) { assert.Equal(t, 10*time.Second, cfg.LeaseRenewalInterval) assert.Equal(t, 30*time.Second, cfg.LeaseDuration) assert.True(t, cfg.DLQ.Enabled) + assert.Equal(t, "_dlq", cfg.DLQ.TopicSuffix) assert.Equal(t, 3, cfg.Retry.MaxAttempts) assert.Equal(t, 1*time.Second, cfg.Retry.InitialBackoff) assert.Equal(t, 30*time.Second, cfg.Retry.MaxBackoff) @@ -92,6 +93,42 @@ func TestConfigValidation(t *testing.T) { }, expectError: true, }, + { + name: "DLQ enabled without topic suffix", + config: Config{ + ConsumerGroup: "test", + WorkerID: "test-worker", + PollInterval: 100 * time.Millisecond, + BatchSize: 10, + VisibilityTimeout: 60 * time.Second, + LeaseRenewalInterval: 10 * time.Second, + LeaseDuration: 30 * time.Second, + Retry: DefaultConfig("dummy", "dummy").Retry, + DLQ: DLQConfig{ + Enabled: true, + TopicSuffix: "", // Missing suffix + }, + }, + expectError: true, + }, + { + name: "DLQ disabled without topic suffix - valid", + config: Config{ + ConsumerGroup: "test", + WorkerID: "test-worker", + PollInterval: 100 * time.Millisecond, + BatchSize: 10, + VisibilityTimeout: 60 * time.Second, + LeaseRenewalInterval: 10 * time.Second, + LeaseDuration: 30 * time.Second, + Retry: DefaultConfig("dummy", "dummy").Retry, + DLQ: DLQConfig{ + Enabled: false, + TopicSuffix: "", // Suffix not required when disabled + }, + }, + expectError: false, + }, } for _, tt := range tests { diff --git a/extensions/queue/sql/message_store.go b/extensions/queue/sql/message_store.go index 7a0c566e..714c635c 100644 --- a/extensions/queue/sql/message_store.go +++ b/extensions/queue/sql/message_store.go @@ -72,8 +72,8 @@ func (s *sqlmessageStore) Insert(ctx context.Context, topic string, messages []q defer tx.Rollback() stmt, err := tx.PrepareContext(ctx, fmt.Sprintf(` - INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, retry_count, invisible_until) - VALUES (?, ?, ?, ?, ?, ?, ?, 0, 0) + INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, retry_count, invisible_until, failed_at, failure_count, last_error, original_topic) + VALUES (?, ?, ?, ?, ?, ?, ?, 0, 0, 0, 0, '', '') `, MessagesTableName)) if err != nil { s.logger.Errorw("failed to prepare statement", @@ -210,7 +210,7 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti // Fetch visible messages (invisible_until <= now) rows, err := tx.QueryContext(ctx, fmt.Sprintf(` - SELECT offset, id, payload, metadata, partition_key, retry_count, published_at + SELECT offset, id, payload, metadata, partition_key, retry_count, published_at, failed_at, failure_count, last_error, original_topic FROM %s WHERE topic = ? AND partition_key = ? AND offset > ? AND invisible_until <= ? ORDER BY offset @@ -239,9 +239,13 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti partKey string retryCount int publishedAtMilli int64 + failedAt int64 + failureCount int + lastError string + originalTopic string ) - if err := rows.Scan(&offset, &id, &payload, &metadataJSON, &partKey, &retryCount, &publishedAtMilli); err != nil { + if err := rows.Scan(&offset, &id, &payload, &metadataJSON, &partKey, &retryCount, &publishedAtMilli, &failedAt, &failureCount, &lastError, &originalTopic); err != nil { s.logger.Errorw("failed to scan message row", logTopic, topic, logPartitionKey, partitionKey, @@ -269,13 +273,17 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti } results = append(results, messageRow{ - Offset: offset, - ID: id, - Payload: payload, - Metadata: metadata, - PartitionKey: partKey, - RetryCount: retryCount, - PublishedAt: publishedAtMilli, + Offset: offset, + ID: id, + Payload: payload, + Metadata: metadata, + PartitionKey: partKey, + RetryCount: retryCount, + PublishedAt: publishedAtMilli, + FailedAt: failedAt, + FailureCount: failureCount, + LastError: lastError, + OriginalTopic: originalTopic, }) messageIDs = append(messageIDs, id) @@ -349,7 +357,9 @@ func (s *sqlmessageStore) FetchByOffset(ctx context.Context, topic string, parti return results, nil } -// MoveToDLQ atomically moves a message from the main table to the DLQ table +// MoveToDLQ atomically moves a message to the DLQ by reinserting it with the DLQ topic name +// The message is inserted back into queue_messages table with the DLQ topic (original + suffix) +// This allows DLQ messages to be consumed using the normal subscriber func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID string, failureCount int, lastError string) error { start := time.Now() success := false @@ -361,6 +371,9 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID s.metrics.Tagged(map[string]string{"result": result}).Timer("move_to_dlq.latency").Record(time.Since(start)) }() + // Construct DLQ topic name + dlqTopic := topic + s.config.DLQ.TopicSuffix + tx, err := s.db.BeginTx(ctx, nil) if err != nil { s.logger.Errorw("failed to begin transaction for DLQ move", @@ -380,13 +393,14 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID partitionKey string createdAtMilli int64 publishedAtMilli int64 + retryCount int ) err = tx.QueryRowContext(ctx, fmt.Sprintf(` - SELECT payload, metadata, partition_key, created_at, published_at + SELECT payload, metadata, partition_key, created_at, published_at, retry_count FROM %s WHERE topic = ? AND id = ? - `, MessagesTableName), topic, messageID).Scan(&payload, &metadataJSON, &partitionKey, &createdAtMilli, &publishedAtMilli) + `, MessagesTableName), topic, messageID).Scan(&payload, &metadataJSON, &partitionKey, &createdAtMilli, &publishedAtMilli, &retryCount) if err != nil { if err == sql.ErrNoRows { @@ -406,16 +420,19 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID return fmt.Errorf("failed to fetch message: %w", err) } - // Insert into DLQ table + // Insert into queue_messages table with DLQ topic name and DLQ-specific fields + // Reset retry_count to 0 since this is a new topic (DLQ processing starts fresh) + // Store the original failure count for tracking purposes now := start.UnixMilli() _, err = tx.ExecContext(ctx, fmt.Sprintf(` - INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, failed_at, failure_count, last_error) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `, DLQTableName), topic, messageID, payload, metadataJSON, partitionKey, createdAtMilli, publishedAtMilli, now, failureCount, lastError) + INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, invisible_until, retry_count, failed_at, failure_count, last_error, original_topic) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, MessagesTableName), dlqTopic, messageID, payload, metadataJSON, partitionKey, createdAtMilli, publishedAtMilli, int64(0), 0, now, failureCount, lastError, topic) if err != nil { - s.logger.Errorw("failed to insert into DLQ", + s.logger.Errorw("failed to insert into DLQ topic", logTopic, topic, + "dlq_topic", dlqTopic, logMessageID, messageID, logPartitionKey, partitionKey, "failure_count", failureCount, @@ -425,7 +442,7 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID return fmt.Errorf("failed to insert into DLQ: %w", err) } - // Delete from main table + // Delete from original topic _, err = tx.ExecContext(ctx, fmt.Sprintf(` DELETE FROM %s WHERE topic = ? AND id = ? `, MessagesTableName), topic, messageID) @@ -454,6 +471,7 @@ func (s *sqlmessageStore) MoveToDLQ(ctx context.Context, topic string, messageID s.metrics.Counter("messages.moved_to_dlq").Inc(1) s.logger.Infow("moved message to DLQ", logTopic, topic, + "dlq_topic", dlqTopic, logMessageID, messageID, logPartitionKey, partitionKey, "failure_count", failureCount, diff --git a/extensions/queue/sql/message_store_test.go b/extensions/queue/sql/message_store_test.go index 9ca26207..a6f7da59 100644 --- a/extensions/queue/sql/message_store_test.go +++ b/extensions/queue/sql/message_store_test.go @@ -113,9 +113,9 @@ func TestmessageStore_FetchByOffset(t *testing.T) { // Expect transaction begin mock.ExpectBegin() - // Mock query results - rows := sqlmock.NewRows([]string{"offset", "id", "payload", "metadata", "partition_key", "retry_count", "published_at"}). - AddRow(int64(1), "msg1", []byte("payload1"), []byte("{}"), "part1", 0, time.Now().UnixMilli()) + // Mock query results (including DLQ columns) + rows := sqlmock.NewRows([]string{"offset", "id", "payload", "metadata", "partition_key", "retry_count", "published_at", "failed_at", "failure_count", "last_error", "original_topic"}). + AddRow(int64(1), "msg1", []byte("payload1"), []byte("{}"), "part1", 0, time.Now().UnixMilli(), int64(0), 0, "", "") mock.ExpectQuery("SELECT (.+) FROM queue_messages"). WithArgs(topic, partitionKey, currentOffset, sqlmock.AnyArg(), limit). @@ -163,19 +163,26 @@ func TestmessageStore_MoveToDLQ(t *testing.T) { failureCount := 3 lastError := "test error" + // Get config to know the DLQ suffix + config := DefaultConfig("test-consumer", "test-worker") + dlqTopic := topic + config.DLQ.TopicSuffix // "test_topic_dlq" + // Expect transaction begin mock.ExpectBegin() - // Mock query for fetching message - SELECT payload, metadata, partition_key, created_at, published_at - rows := sqlmock.NewRows([]string{"payload", "metadata", "partition_key", "created_at", "published_at"}). - AddRow([]byte("payload1"), []byte("{}"), "part1", time.Now().UnixMilli(), time.Now().UnixMilli()) + // Mock query for fetching message - SELECT payload, metadata, partition_key, created_at, published_at, retry_count + rows := sqlmock.NewRows([]string{"payload", "metadata", "partition_key", "created_at", "published_at", "retry_count"}). + AddRow([]byte("payload1"), []byte(`{"key":"value"}`), "part1", time.Now().UnixMilli(), time.Now().UnixMilli(), failureCount) mock.ExpectQuery("SELECT (.+) FROM queue_messages"). WithArgs(topic, messageID). WillReturnRows(rows) - // Expect insert into DLQ - mock.ExpectExec("INSERT INTO queue_dlq"). + // Expect insert into queue_messages with DLQ topic and DLQ-specific columns + // Columns: topic, id, payload, metadata, partition_key, created_at, published_at, invisible_until, retry_count, failed_at, failure_count, last_error, original_topic + // Note: retry_count is reset to 0 for DLQ processing, but failure_count preserves the original attempts + mock.ExpectExec("INSERT INTO queue_messages"). + WithArgs(dlqTopic, messageID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), int64(0), 0, sqlmock.AnyArg(), failureCount, lastError, topic). WillReturnResult(sqlmock.NewResult(1, 1)) // Expect delete from main table diff --git a/extensions/queue/sql/publisher_test.go b/extensions/queue/sql/publisher_test.go index 8f02b1da..2064b121 100644 --- a/extensions/queue/sql/publisher_test.go +++ b/extensions/queue/sql/publisher_test.go @@ -163,8 +163,7 @@ func TestPublisher_PublishAfterClose(t *testing.T) { require.NoError(t, err) // Try to publish after close - msg := queue.NewMessage("msg1", []byte("payload")) - msg.PartitionKey = "part1" + msg := queue.NewMessage("msg1", []byte("payload"), "part1", nil) err = pub.Publish(ctx, "test_topic", msg) require.Error(t, err) } @@ -253,8 +252,7 @@ func TestValidateTopicName(t *testing.T) { // Try to publish with this topic name ctx := context.Background() - msg := queue.NewMessage("msg1", []byte("test")) - msg.PartitionKey = "part1" + msg := queue.NewMessage("msg1", []byte("test"), "part1", nil) if !tt.wantErr { mockStore.EXPECT().Insert(gomock.Any(), tt.topicName, gomock.Any()).Return(nil).Times(1) @@ -348,8 +346,7 @@ func TestPublisher_PublishContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - msg := queue.NewMessage("msg1", []byte("payload")) - msg.PartitionKey = "part1" + msg := queue.NewMessage("msg1", []byte("payload"), "part1", nil) // Should fail with context cancelled error err := pub.Publish(ctx, "test_topic", msg) diff --git a/extensions/queue/sql/schema/BUILD.bazel b/extensions/queue/sql/schema/BUILD.bazel index df9efc2e..9f8f6151 100644 --- a/extensions/queue/sql/schema/BUILD.bazel +++ b/extensions/queue/sql/schema/BUILD.bazel @@ -1,7 +1,6 @@ filegroup( name = "schema", srcs = [ - "queue_dlq.sql", "queue_messages.sql", "queue_offsets.sql", "queue_partition_leases.sql", diff --git a/extensions/queue/sql/schema/queue_dlq.sql b/extensions/queue/sql/schema/queue_dlq.sql deleted file mode 100644 index 7a939264..00000000 --- a/extensions/queue/sql/schema/queue_dlq.sql +++ /dev/null @@ -1,40 +0,0 @@ --- DEAD LETTER QUEUE TABLE --- Failed messages that exhausted retry attempts. - -CREATE TABLE IF NOT EXISTS queue_dlq ( - -- Auto-incrementing global offset for ordering/acking in DLQ - offset BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, - - -- Original topic and partition - topic VARCHAR(255) NOT NULL, - partition_key VARCHAR(255) NOT NULL, - - -- Message identification (for deduplication) - id VARCHAR(255) NOT NULL, - - -- Message data - payload BLOB NOT NULL, - metadata JSON, - - -- Original timestamps (epoch milliseconds) - created_at BIGINT UNSIGNED NOT NULL, - published_at BIGINT UNSIGNED NOT NULL, - - -- DLQ-specific fields - failed_at BIGINT UNSIGNED NOT NULL, - failure_count INT UNSIGNED NOT NULL, - last_error TEXT, - - -- Supports: SELECT ... WHERE topic=? AND partition_key=? AND failed_at>=? ORDER BY failed_at - -- Used for fetching recently failed messages for a specific topic/partition, e.g., for retrying or monitoring - INDEX idx_topic_partition_failed (topic, partition_key, failed_at), - - -- Supports: SELECT ... WHERE topic=? AND failed_at>=? ORDER BY failed_at - -- Used for fetching recently failed messages across all partitions of a topic - INDEX idx_failed_at (topic, failed_at), - - -- Unique constraint to prevent duplicate entries for the same message in the DLQ - -- Supports: INSERT ... ON DUPLICATE KEY to enforce idempotent DLQ operations - -- Also enables efficient lookups for retrying or inspecting specific failed messages - UNIQUE KEY idx_topic_partition_id (topic, partition_key, id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; diff --git a/extensions/queue/sql/schema/queue_messages.sql b/extensions/queue/sql/schema/queue_messages.sql index 1904c90c..a60851c5 100644 --- a/extensions/queue/sql/schema/queue_messages.sql +++ b/extensions/queue/sql/schema/queue_messages.sql @@ -31,6 +31,15 @@ CREATE TABLE IF NOT EXISTS queue_messages ( created_at BIGINT UNSIGNED NOT NULL, published_at BIGINT UNSIGNED NOT NULL, + -- DLQ-specific fields (0/"" for normal messages, populated for DLQ messages) + failed_at BIGINT UNSIGNED NOT NULL, + -- failure_count stores how many times the message failed on the ORIGINAL topic before moving to DLQ + -- This is different from retry_count, which tracks retries on the CURRENT topic and gets reset to 0 on DLQ move + -- We need both because: retry_count must reset for DLQ processing, but we still need to know original failure count + failure_count INT UNSIGNED NOT NULL, + last_error TEXT NOT NULL, + original_topic VARCHAR(255) NOT NULL, + -- Supports: SELECT ... WHERE topic=? AND partition_key=? AND invisible_until<=? ORDER BY offset -- Used by subscribers to poll for ready-to-process messages within their assigned partition INDEX idx_topic_partition_visible_offset (topic, partition_key, invisible_until, offset), diff --git a/extensions/queue/sql/stores.go b/extensions/queue/sql/stores.go index 2c012ed1..b46a7fcc 100644 --- a/extensions/queue/sql/stores.go +++ b/extensions/queue/sql/stores.go @@ -32,6 +32,14 @@ type messageRow struct { RetryCount int // PublishedAt is the Unix timestamp in milliseconds when message was published PublishedAt int64 + // FailedAt is the Unix timestamp in milliseconds when the message failed (0 for normal messages, >0 for DLQ) + FailedAt int64 + // FailureCount tracks total failures before moving to DLQ (0 for normal messages, >0 for DLQ) + FailureCount int + // LastError contains the error message from the final failure ("" for normal messages) + LastError string + // OriginalTopic is the topic where the message originally failed ("" for normal messages) + OriginalTopic string } // messageStore handles message table operations (internal use only) @@ -48,6 +56,7 @@ type messageStore interface { FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, limit int) ([]messageRow, error) // MoveToDLQ moves a message to the dead letter queue + // The DLQ topic is automatically constructed from the original topic plus the configured DLQ suffix MoveToDLQ(ctx context.Context, topic string, messageID string, failureCount int, lastError string) error // SetVisibilityTimeout sets the invisible_until timestamp for a message diff --git a/extensions/queue/sql/subscriber.go b/extensions/queue/sql/subscriber.go index f2b4991f..0b31128e 100644 --- a/extensions/queue/sql/subscriber.go +++ b/extensions/queue/sql/subscriber.go @@ -411,15 +411,18 @@ func (s *subscriber) fetchAndDeliverPartition(ctx context.Context, sub *subscrip // Move to DLQ if enabled if s.config.DLQ.Enabled { + dlqTopic := sub.topic + s.config.DLQ.TopicSuffix if err := s.messageStore.MoveToDLQ(ctx, sub.topic, row.ID, row.RetryCount, "exceeded retry limit"); err != nil { s.logger.Errorw("failed to move message to DLQ", "topic", sub.topic, + "dlq_topic", dlqTopic, "message_id", row.ID, "error", err, ) } else { s.logger.Infow("moved message to DLQ", "topic", sub.topic, + "dlq_topic", dlqTopic, "message_id", row.ID, "retry_count", row.RetryCount, ) @@ -443,9 +446,7 @@ func (s *subscriber) fetchAndDeliverPartition(ctx context.Context, sub *subscrip } // Create message (value type) - msg := queue.NewMessage(row.ID, row.Payload) - msg.Metadata = row.Metadata - msg.PartitionKey = row.PartitionKey + msg := queue.NewMessage(row.ID, row.Payload, row.PartitionKey, row.Metadata) msg.PublishedAt = row.PublishedAt // Calculate message age for metrics @@ -465,6 +466,20 @@ func (s *subscriber) fetchAndDeliverPartition(ctx context.Context, sub *subscrip "offset": deliveryID, } + // Add DLQ-specific metadata if this is a DLQ message + if row.FailedAt > 0 { + deliveryMetadata["dlq.failed_at"] = fmt.Sprintf("%d", row.FailedAt) + } + if row.FailureCount > 0 { + deliveryMetadata["dlq.failure_count"] = fmt.Sprintf("%d", row.FailureCount) + } + if row.LastError != "" { + deliveryMetadata["dlq.last_error"] = row.LastError + } + if row.OriginalTopic != "" { + deliveryMetadata["dlq.original_topic"] = row.OriginalTopic + } + // Create SQL delivery implementation delivery := newSQLDelivery( msg, diff --git a/go.mod b/go.mod index 32535aa6..e6eb7e7a 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/uber/submitqueue go 1.24.5 require ( + github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/go-sql-driver/mysql v1.9.3 github.com/gogo/protobuf v1.3.2 github.com/stretchr/testify v1.11.1 @@ -22,7 +23,6 @@ require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/BurntSushi/toml v1.2.1 // indirect - github.com/DATA-DOG/go-sqlmock v1.5.2 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect