Skip to content

Commit afee107

Browse files
committed
feat(sql/queue): merge DLQ logic into message table
Why? Previously, DLQ messages were stored in a separate queue_dlq table, requiring separate schema management and limiting the ability to consume from DLQ topics. By merging DLQ into the messages table with a topic suffix pattern (e.g., "orders" → "orders_dlq"), we can: - Reuse existing subscriber/publisher logic for DLQ consumption - Simplify schema management (single table) - Track DLQ-specific metadata (failure_count, last_error, etc.) - Enable DLQ reprocessing workflows What? - Add TopicSuffix to DLQConfig (default "_dlq") - Add DLQ columns to queue_messages schema (failed_at, failure_count, last_error, original_topic) as nullable fields - Update MoveToDLQ to insert into queue_messages with DLQ topic name - Expose DLQ metadata through delivery metadata for testing - Remove separate queue_dlq.sql schema - Update FetchByOffset to select and populate DLQ fields - Fix unit test mocks to include all 11 columns - Document retry_count vs failure_count distinction in schema Co-Authored-By: Preetam Dwivedi <preetam@uber.com>
1 parent 69b12cb commit afee107

14 files changed

Lines changed: 177 additions & 95 deletions

entities/queue/message.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,19 @@ type Message struct {
2727
PublishedAt int64
2828
}
2929

30-
// NewMessage creates a new message with the given ID and payload.
31-
// Metadata is initialized as an empty map.
30+
// NewMessage creates a new message with the given ID, payload, partition key, and metadata.
31+
// If metadata is nil, it will be initialized as an empty map.
3232
// PublishedAt is set to the current time.
33-
func NewMessage(id string, payload []byte) Message {
33+
func NewMessage(id string, payload []byte, partitionKey string, metadata map[string]string) Message {
34+
if metadata == nil {
35+
metadata = make(map[string]string)
36+
}
3437
return Message{
35-
ID: id,
36-
Payload: payload,
37-
Metadata: make(map[string]string),
38-
PublishedAt: time.Now().UnixMilli(),
38+
ID: id,
39+
Payload: payload,
40+
PartitionKey: partitionKey,
41+
Metadata: metadata,
42+
PublishedAt: time.Now().UnixMilli(),
3943
}
4044
}
4145

entities/queue/message_test.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,18 @@ func TestNewMessage(t *testing.T) {
1111
id := "test-id"
1212
payload := []byte("test payload")
1313

14-
msg := NewMessage(id, payload)
14+
msg := NewMessage(id, payload, "", nil)
1515

1616
assert.Equal(t, id, msg.ID)
1717
assert.Equal(t, payload, msg.Payload)
18+
assert.Empty(t, msg.PartitionKey)
1819
assert.NotNil(t, msg.Metadata)
1920
assert.Empty(t, msg.Metadata)
2021
assert.NotZero(t, msg.PublishedAt)
2122
}
2223

2324
func TestMessage_Copy(t *testing.T) {
24-
original := NewMessage("id-123", []byte("payload"))
25-
original.Metadata["key"] = "value"
26-
original.PartitionKey = "partition-1"
25+
original := NewMessage("id-123", []byte("payload"), "partition-1", map[string]string{"key": "value"})
2726

2827
copied := original.Copy()
2928

@@ -44,7 +43,7 @@ func TestMessage_Copy(t *testing.T) {
4443
}
4544

4645
func TestMessage_Copy_EmptyPayload(t *testing.T) {
47-
original := NewMessage("id", []byte{})
46+
original := NewMessage("id", []byte{}, "", nil)
4847
copied := original.Copy()
4948

5049
assert.NotNil(t, copied.Payload)
@@ -53,16 +52,16 @@ func TestMessage_Copy_EmptyPayload(t *testing.T) {
5352
}
5453

5554
func TestMessage_Fields(t *testing.T) {
56-
msg := NewMessage("id-123", []byte("payload"))
55+
msg := NewMessage("id-123", []byte("payload"), "user-123", map[string]string{
56+
"trace-id": "xyz",
57+
"source": "gateway",
58+
})
5759

5860
// Test metadata
59-
msg.Metadata["trace-id"] = "xyz"
60-
msg.Metadata["source"] = "gateway"
6161
assert.Equal(t, "xyz", msg.Metadata["trace-id"])
6262
assert.Equal(t, "gateway", msg.Metadata["source"])
6363

6464
// Test partition key
65-
msg.PartitionKey = "user-123"
6665
assert.Equal(t, "user-123", msg.PartitionKey)
6766

6867
// Test PublishedAt can be overridden

extensions/queue/sql/config.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ type RetryConfig struct {
6060
type DLQConfig struct {
6161
// Enabled enables dead letter queue
6262
Enabled bool
63+
64+
// TopicSuffix is appended to the original topic name to create the DLQ topic
65+
// For example, if original topic is "orders" and suffix is "_dlq", DLQ topic will be "orders_dlq"
66+
TopicSuffix string
6367
}
6468

6569
// DefaultConfig returns a Config with sensible defaults
@@ -79,7 +83,8 @@ func DefaultConfig(consumerGroup, workerID string) Config {
7983
BackoffMultiplier: 2.0,
8084
},
8185
DLQ: DLQConfig{
82-
Enabled: true,
86+
Enabled: true,
87+
TopicSuffix: "_dlq",
8388
},
8489
}
8590
}
@@ -122,5 +127,8 @@ func (c *Config) Validate() error {
122127
if c.Retry.BackoffMultiplier < 1.0 {
123128
return fmt.Errorf("Retry.BackoffMultiplier must be >= 1.0")
124129
}
130+
if c.DLQ.Enabled && c.DLQ.TopicSuffix == "" {
131+
return fmt.Errorf("DLQ.TopicSuffix is required when DLQ is enabled")
132+
}
125133
return nil
126134
}

extensions/queue/sql/config_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func TestDefaultConfig(t *testing.T) {
1919
assert.Equal(t, 10*time.Second, cfg.LeaseRenewalInterval)
2020
assert.Equal(t, 30*time.Second, cfg.LeaseDuration)
2121
assert.True(t, cfg.DLQ.Enabled)
22+
assert.Equal(t, "_dlq", cfg.DLQ.TopicSuffix)
2223
assert.Equal(t, 3, cfg.Retry.MaxAttempts)
2324
assert.Equal(t, 1*time.Second, cfg.Retry.InitialBackoff)
2425
assert.Equal(t, 30*time.Second, cfg.Retry.MaxBackoff)
@@ -92,6 +93,42 @@ func TestConfigValidation(t *testing.T) {
9293
},
9394
expectError: true,
9495
},
96+
{
97+
name: "DLQ enabled without topic suffix",
98+
config: Config{
99+
ConsumerGroup: "test",
100+
WorkerID: "test-worker",
101+
PollInterval: 100 * time.Millisecond,
102+
BatchSize: 10,
103+
VisibilityTimeout: 60 * time.Second,
104+
LeaseRenewalInterval: 10 * time.Second,
105+
LeaseDuration: 30 * time.Second,
106+
Retry: DefaultConfig("dummy", "dummy").Retry,
107+
DLQ: DLQConfig{
108+
Enabled: true,
109+
TopicSuffix: "", // Missing suffix
110+
},
111+
},
112+
expectError: true,
113+
},
114+
{
115+
name: "DLQ disabled without topic suffix - valid",
116+
config: Config{
117+
ConsumerGroup: "test",
118+
WorkerID: "test-worker",
119+
PollInterval: 100 * time.Millisecond,
120+
BatchSize: 10,
121+
VisibilityTimeout: 60 * time.Second,
122+
LeaseRenewalInterval: 10 * time.Second,
123+
LeaseDuration: 30 * time.Second,
124+
Retry: DefaultConfig("dummy", "dummy").Retry,
125+
DLQ: DLQConfig{
126+
Enabled: false,
127+
TopicSuffix: "", // Suffix not required when disabled
128+
},
129+
},
130+
expectError: false,
131+
},
95132
}
96133

97134
for _, tt := range tests {

extensions/queue/sql/message_store.go

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (s *sqlMessageStore) FetchByOffset(ctx context.Context, topic string, parti
193193

194194
// Fetch visible messages (invisible_until <= now)
195195
rows, err := tx.QueryContext(ctx, fmt.Sprintf(`
196-
SELECT offset, id, payload, metadata, partition_key, retry_count, published_at
196+
SELECT offset, id, payload, metadata, partition_key, retry_count, published_at, failed_at, failure_count, last_error, original_topic
197197
FROM %s
198198
WHERE topic = ? AND partition_key = ? AND offset > ? AND invisible_until <= ?
199199
ORDER BY offset
@@ -222,9 +222,13 @@ func (s *sqlMessageStore) FetchByOffset(ctx context.Context, topic string, parti
222222
partKey string
223223
retryCount int
224224
publishedAtMilli int64
225+
failedAt sql.NullInt64
226+
failureCount sql.NullInt32
227+
lastError sql.NullString
228+
originalTopic sql.NullString
225229
)
226230

227-
if err := rows.Scan(&offset, &id, &payload, &metadataJSON, &partKey, &retryCount, &publishedAtMilli); err != nil {
231+
if err := rows.Scan(&offset, &id, &payload, &metadataJSON, &partKey, &retryCount, &publishedAtMilli, &failedAt, &failureCount, &lastError, &originalTopic); err != nil {
228232
s.logger.Errorw("failed to scan message row",
229233
logTopic, topic,
230234
logPartitionKey, partitionKey,
@@ -251,14 +255,40 @@ func (s *sqlMessageStore) FetchByOffset(ctx context.Context, topic string, parti
251255
metadata = make(map[string]string)
252256
}
253257

258+
// Convert sql.Null types to pointers
259+
var failedAtPtr *int64
260+
if failedAt.Valid {
261+
failedAtPtr = &failedAt.Int64
262+
}
263+
264+
var failureCountPtr *int
265+
if failureCount.Valid {
266+
val := int(failureCount.Int32)
267+
failureCountPtr = &val
268+
}
269+
270+
var lastErrorPtr *string
271+
if lastError.Valid {
272+
lastErrorPtr = &lastError.String
273+
}
274+
275+
var originalTopicPtr *string
276+
if originalTopic.Valid {
277+
originalTopicPtr = &originalTopic.String
278+
}
279+
254280
results = append(results, MessageRow{
255-
Offset: offset,
256-
ID: id,
257-
Payload: payload,
258-
Metadata: metadata,
259-
PartitionKey: partKey,
260-
RetryCount: retryCount,
261-
PublishedAt: publishedAtMilli,
281+
Offset: offset,
282+
ID: id,
283+
Payload: payload,
284+
Metadata: metadata,
285+
PartitionKey: partKey,
286+
RetryCount: retryCount,
287+
PublishedAt: publishedAtMilli,
288+
FailedAt: failedAtPtr,
289+
FailureCount: failureCountPtr,
290+
LastError: lastErrorPtr,
291+
OriginalTopic: originalTopicPtr,
262292
})
263293

264294
messageIDs = append(messageIDs, id)
@@ -331,13 +361,18 @@ func (s *sqlMessageStore) FetchByOffset(ctx context.Context, topic string, parti
331361
return results, nil
332362
}
333363

334-
// MoveToDLQ atomically moves a message from the main table to the DLQ table
364+
// MoveToDLQ atomically moves a message to the DLQ by reinserting it with the DLQ topic name
365+
// The message is inserted back into queue_messages table with the DLQ topic (original + suffix)
366+
// This allows DLQ messages to be consumed using the normal subscriber
335367
func (s *sqlMessageStore) MoveToDLQ(ctx context.Context, topic string, messageID string, failureCount int, lastError string) error {
336368
start := time.Now()
337369
defer func() {
338370
s.metrics.Timer("move_to_dlq.latency").Record(time.Since(start))
339371
}()
340372

373+
// Construct DLQ topic name
374+
dlqTopic := topic + s.config.DLQ.TopicSuffix
375+
341376
tx, err := s.db.BeginTx(ctx, nil)
342377
if err != nil {
343378
s.logger.Errorw("failed to begin transaction for DLQ move",
@@ -357,13 +392,14 @@ func (s *sqlMessageStore) MoveToDLQ(ctx context.Context, topic string, messageID
357392
partitionKey string
358393
createdAtMilli int64
359394
publishedAtMilli int64
395+
retryCount int
360396
)
361397

362398
err = tx.QueryRowContext(ctx, fmt.Sprintf(`
363-
SELECT payload, metadata, partition_key, created_at, published_at
399+
SELECT payload, metadata, partition_key, created_at, published_at, retry_count
364400
FROM %s
365401
WHERE topic = ? AND id = ?
366-
`, MessagesTableName), topic, messageID).Scan(&payload, &metadataJSON, &partitionKey, &createdAtMilli, &publishedAtMilli)
402+
`, MessagesTableName), topic, messageID).Scan(&payload, &metadataJSON, &partitionKey, &createdAtMilli, &publishedAtMilli, &retryCount)
367403

368404
if err != nil {
369405
if err == sql.ErrNoRows {
@@ -383,16 +419,19 @@ func (s *sqlMessageStore) MoveToDLQ(ctx context.Context, topic string, messageID
383419
return fmt.Errorf("failed to fetch message: %w", err)
384420
}
385421

386-
// Insert into DLQ table
422+
// Insert into queue_messages table with DLQ topic name and DLQ-specific fields
423+
// Reset retry_count to 0 since this is a new topic (DLQ processing starts fresh)
424+
// Store the original failure count for tracking purposes
387425
now := time.Now().UnixMilli()
388426
_, err = tx.ExecContext(ctx, fmt.Sprintf(`
389-
INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, failed_at, failure_count, last_error)
390-
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
391-
`, DLQTableName), topic, messageID, payload, metadataJSON, partitionKey, createdAtMilli, publishedAtMilli, now, failureCount, lastError)
427+
INSERT INTO %s (topic, id, payload, metadata, partition_key, created_at, published_at, invisible_until, retry_count, failed_at, failure_count, last_error, original_topic)
428+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
429+
`, MessagesTableName), dlqTopic, messageID, payload, metadataJSON, partitionKey, createdAtMilli, publishedAtMilli, int64(0), 0, now, failureCount, lastError, topic)
392430

393431
if err != nil {
394-
s.logger.Errorw("failed to insert into DLQ",
432+
s.logger.Errorw("failed to insert into DLQ topic",
395433
logTopic, topic,
434+
"dlq_topic", dlqTopic,
396435
logMessageID, messageID,
397436
logPartitionKey, partitionKey,
398437
"failure_count", failureCount,
@@ -402,7 +441,7 @@ func (s *sqlMessageStore) MoveToDLQ(ctx context.Context, topic string, messageID
402441
return fmt.Errorf("failed to insert into DLQ: %w", err)
403442
}
404443

405-
// Delete from main table
444+
// Delete from original topic
406445
_, err = tx.ExecContext(ctx, fmt.Sprintf(`
407446
DELETE FROM %s WHERE topic = ? AND id = ?
408447
`, MessagesTableName), topic, messageID)
@@ -431,6 +470,7 @@ func (s *sqlMessageStore) MoveToDLQ(ctx context.Context, topic string, messageID
431470
s.metrics.Counter("messages.moved_to_dlq").Inc(1)
432471
s.logger.Infow("moved message to DLQ",
433472
logTopic, topic,
473+
"dlq_topic", dlqTopic,
434474
logMessageID, messageID,
435475
logPartitionKey, partitionKey,
436476
"failure_count", failureCount,

extensions/queue/sql/message_store_test.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,9 @@ func TestMessageStore_FetchByOffset(t *testing.T) {
113113
// Expect transaction begin
114114
mock.ExpectBegin()
115115

116-
// Mock query results
117-
rows := sqlmock.NewRows([]string{"offset", "id", "payload", "metadata", "partition_key", "retry_count", "published_at"}).
118-
AddRow(int64(1), "msg1", []byte("payload1"), []byte("{}"), "part1", 0, time.Now().UnixMilli())
116+
// Mock query results (including DLQ columns)
117+
rows := sqlmock.NewRows([]string{"offset", "id", "payload", "metadata", "partition_key", "retry_count", "published_at", "failed_at", "failure_count", "last_error", "original_topic"}).
118+
AddRow(int64(1), "msg1", []byte("payload1"), []byte("{}"), "part1", 0, time.Now().UnixMilli(), nil, nil, nil, nil)
119119

120120
mock.ExpectQuery("SELECT (.+) FROM queue_messages").
121121
WithArgs(topic, partitionKey, currentOffset, sqlmock.AnyArg(), limit).
@@ -163,19 +163,26 @@ func TestMessageStore_MoveToDLQ(t *testing.T) {
163163
failureCount := 3
164164
lastError := "test error"
165165

166+
// Get config to know the DLQ suffix
167+
config := DefaultConfig("test-consumer", "test-worker")
168+
dlqTopic := topic + config.DLQ.TopicSuffix // "test_topic_dlq"
169+
166170
// Expect transaction begin
167171
mock.ExpectBegin()
168172

169-
// Mock query for fetching message - SELECT payload, metadata, partition_key, created_at, published_at
170-
rows := sqlmock.NewRows([]string{"payload", "metadata", "partition_key", "created_at", "published_at"}).
171-
AddRow([]byte("payload1"), []byte("{}"), "part1", time.Now().UnixMilli(), time.Now().UnixMilli())
173+
// Mock query for fetching message - SELECT payload, metadata, partition_key, created_at, published_at, retry_count
174+
rows := sqlmock.NewRows([]string{"payload", "metadata", "partition_key", "created_at", "published_at", "retry_count"}).
175+
AddRow([]byte("payload1"), []byte(`{"key":"value"}`), "part1", time.Now().UnixMilli(), time.Now().UnixMilli(), failureCount)
172176

173177
mock.ExpectQuery("SELECT (.+) FROM queue_messages").
174178
WithArgs(topic, messageID).
175179
WillReturnRows(rows)
176180

177-
// Expect insert into DLQ
178-
mock.ExpectExec("INSERT INTO queue_dlq").
181+
// Expect insert into queue_messages with DLQ topic and DLQ-specific columns
182+
// Columns: topic, id, payload, metadata, partition_key, created_at, published_at, invisible_until, retry_count, failed_at, failure_count, last_error, original_topic
183+
// Note: retry_count is reset to 0 for DLQ processing, but failure_count preserves the original attempts
184+
mock.ExpectExec("INSERT INTO queue_messages").
185+
WithArgs(dlqTopic, messageID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), int64(0), 0, sqlmock.AnyArg(), failureCount, lastError, topic).
179186
WillReturnResult(sqlmock.NewResult(1, 1))
180187

181188
// Expect delete from main table

extensions/queue/sql/mock_stores.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extensions/queue/sql/publisher_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,7 @@ func TestPublisher_PublishAfterClose(t *testing.T) {
163163
require.NoError(t, err)
164164

165165
// Try to publish after close
166-
msg := queue.NewMessage("msg1", []byte("payload"))
167-
msg.PartitionKey = "part1"
166+
msg := queue.NewMessage("msg1", []byte("payload"), "part1", nil)
168167
err = pub.Publish(ctx, "test_topic", msg)
169168
require.Error(t, err)
170169
}
@@ -253,8 +252,7 @@ func TestValidateTopicName(t *testing.T) {
253252

254253
// Try to publish with this topic name
255254
ctx := context.Background()
256-
msg := queue.NewMessage("msg1", []byte("test"))
257-
msg.PartitionKey = "part1"
255+
msg := queue.NewMessage("msg1", []byte("test"), "part1", nil)
258256

259257
if !tt.wantErr {
260258
mockStore.EXPECT().Insert(gomock.Any(), tt.topicName, gomock.Any()).Return(nil).Times(1)
@@ -348,8 +346,7 @@ func TestPublisher_PublishContextCancellation(t *testing.T) {
348346
ctx, cancel := context.WithCancel(context.Background())
349347
cancel()
350348

351-
msg := queue.NewMessage("msg1", []byte("payload"))
352-
msg.PartitionKey = "part1"
349+
msg := queue.NewMessage("msg1", []byte("payload"), "part1", nil)
353350

354351
// Should fail with context cancelled error
355352
err := pub.Publish(ctx, "test_topic", msg)

0 commit comments

Comments
 (0)