Skip to content

Commit 752bb37

Browse files
committed
feat(queue/sql): add subscriber with partition leasing and offset tracking
- Implement Subscriber interface with partition-based message polling - Add partition lease management for distributed workers - Add offset tracking per partition for consumption progress - Extend MessageStore interface with FetchByOffset, SetVisibilityTimeout, MoveToDLQ - Add OffsetStore interface for offset management - Add PartitionLeaseStore interface for partition leasing - Generate mocks for all three store interfaces - Add standardized metric tags (topic + partition_key) across all operations - Add comprehensive test coverage for subscription, ack/nack, and error handling
1 parent 7371c3c commit 752bb37

6 files changed

Lines changed: 1021 additions & 0 deletions

File tree

extensions/queue/sql/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@ go_library(
44
name = "sql",
55
srcs = [
66
"config.go",
7+
"errors.go",
78
"mock_stores.go",
89
"publisher.go",
910
"stores.go",
11+
"subscriber.go",
1012
"validation.go",
1113
],
1214
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
1315
visibility = ["//visibility:public"],
1416
deps = [
1517
"//entities/queue",
18+
"//extensions/queue",
1619
"@com_github_uber_go_tally_v4//:tally",
1720
"@org_uber_go_mock//gomock",
1821
"@org_uber_go_zap//:zap",
@@ -24,6 +27,7 @@ go_test(
2427
srcs = [
2528
"config_test.go",
2629
"publisher_test.go",
30+
"subscriber_test.go",
2731
],
2832
embed = [":sql"],
2933
deps = [

extensions/queue/sql/errors.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package sql
2+
3+
import "fmt"
4+
5+
// ErrAlreadyAcknowledged is returned when attempting to ack/nack a delivery that was already processed
6+
type ErrAlreadyAcknowledged struct {
7+
DeliveryID string
8+
}
9+
10+
func (e *ErrAlreadyAcknowledged) Error() string {
11+
return fmt.Sprintf("delivery %s already acknowledged or nacked", e.DeliveryID)
12+
}

extensions/queue/sql/mock_stores.go

Lines changed: 235 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extensions/queue/sql/stores.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,80 @@ import (
88
"github.com/uber/submitqueue/entities/queue"
99
)
1010

11+
// StoreConfig contains configuration needed by store implementations
12+
13+
// MessageRow represents a row from the messages table
14+
type MessageRow struct {
15+
// Offset is the auto-incrementing sequence number for message ordering within a partition
16+
Offset int64
17+
// ID is the unique message identifier
18+
ID string
19+
// Payload is the message body in bytes
20+
Payload []byte
21+
// Metadata contains key-value pairs for message attributes
22+
Metadata map[string]string
23+
// PartitionKey determines which partition this message belongs to for ordering guarantees
24+
PartitionKey string
25+
// RetryCount tracks how many times this message has been retried on the current topic
26+
RetryCount int
27+
// PublishedAt is the Unix timestamp in milliseconds when message was published
28+
PublishedAt int64
29+
}
30+
1131
// MessageStore handles message table operations
1232
type MessageStore interface {
1333
// Insert inserts messages into the topic table
1434
Insert(ctx context.Context, topic string, messages []queue.Message) error
35+
36+
// Delete deletes a message by ID
37+
Delete(ctx context.Context, topic string, messageID string) error
38+
39+
// FetchByOffset fetches messages with offset > currentOffset for a specific partition
40+
// Only fetches visible messages (invisible_until <= now)
41+
// Atomically sets invisible_until and increments retry_count
42+
FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, limit int) ([]MessageRow, error)
43+
44+
// MoveToDLQ moves a message to the dead letter queue
45+
MoveToDLQ(ctx context.Context, topic string, messageID string, failureCount int, lastError string) error
46+
47+
// SetVisibilityTimeout sets the invisible_until timestamp for a message
48+
// visibilityTimeoutMillis: milliseconds from now to hide the message
49+
// If visibilityTimeoutMillis is 0, makes the message visible immediately
50+
// If visibilityTimeoutMillis > 0, makes the message invisible until now + visibilityTimeoutMillis
51+
SetVisibilityTimeout(ctx context.Context, topic string, messageID string, visibilityTimeoutMillis int64) error
52+
}
53+
54+
// OffsetStore handles offset table operations for per-partition offset tracking
55+
type OffsetStore interface {
56+
// Initialize creates an offset entry for a topic+partition if it doesn't exist
57+
Initialize(ctx context.Context, topic string, partitionKey string) error
58+
59+
// GetAckedOffset returns the current acked offset for a topic+partition
60+
GetAckedOffset(ctx context.Context, topic string, partitionKey string) (int64, error)
61+
62+
// UpdateAckedOffset updates the offset_acked for a topic+partition (only if new offset is greater)
63+
UpdateAckedOffset(ctx context.Context, topic string, partitionKey string, offset int64) error
64+
65+
// AckMessage atomically deletes a message and updates the acked offset
66+
AckMessage(ctx context.Context, topic string, partitionKey string, messageID string, offset int64, messageStore MessageStore) error
67+
}
68+
69+
// PartitionLeaseStore handles partition lease operations
70+
type PartitionLeaseStore interface {
71+
// TryAcquireLease attempts to acquire or renew a lease for a partition
72+
// Returns true if lease is acquired/owned by this worker
73+
TryAcquireLease(ctx context.Context, topic string, partitionKey string) (bool, error)
74+
75+
// RenewLease renews the lease for a partition owned by this worker
76+
RenewLease(ctx context.Context, topic string, partitionKey string) error
77+
78+
// ReleaseLease releases the lease for a partition owned by this worker
79+
ReleaseLease(ctx context.Context, topic string, partitionKey string) error
80+
81+
// GetLeasedPartitions returns all partitions currently leased by this worker
82+
GetLeasedPartitions(ctx context.Context, topic string) ([]string, error)
83+
84+
// DiscoverAndAcquirePartitions discovers partitions from messages table and tries to acquire leases
85+
// Returns the number of new leases acquired
86+
DiscoverAndAcquirePartitions(ctx context.Context, topic string) (int, error)
1587
}

0 commit comments

Comments
 (0)