Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions extension/queue/mysql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Params struct {

// MetricsScope for metrics collection (required)
MetricsScope tally.Scope

// OnSignal receives typed subscriber lifecycle signals (HookSignal).
// Nil in production; used by integration tests for event-driven waits.
OnSignal chan HookSignal
}

// NewQueue creates a new SQL-based queue
Expand Down Expand Up @@ -79,6 +83,7 @@ func NewQueue(params Params) (queue.Queue, error) {
heartbeatStore,
deliveryStateStore,
)
subscriber.OnSignal = params.OnSignal

return &queueImpl{
publisher: publisher,
Expand Down
32 changes: 32 additions & 0 deletions extension/queue/mysql/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ const (
gcIdleTickInterval = 100
)

// HookSignal identifies the type of subscriber lifecycle event.
// Named after behavioral concerns (what happened) rather than implementation
// details (which loop ran), so signal names remain stable across refactors.
type HookSignal int

const (
// SignalDeliveryCheck is sent after the subscriber checks a partition for
// deliverable messages (including watermark advancement).
SignalDeliveryCheck HookSignal = iota

// SignalPartitionUpdate is sent after the subscriber evaluates partition
// ownership (discovery, rebalance, lease renewal, heartbeat).
SignalPartitionUpdate
)

type subscriber struct {
logger *zap.SugaredLogger
scope tally.Scope
Expand All @@ -69,6 +84,10 @@ type subscriber struct {
// Active subscriptions
subscriptions map[string]*subscription
subMu sync.Mutex

// OnSignal receives typed lifecycle signals. Nil in production.
// Consumers filter by signal type to wait for specific events.
OnSignal chan HookSignal
}

type subscription struct {
Expand Down Expand Up @@ -309,6 +328,15 @@ func NewSubscriber(logger *zap.SugaredLogger, scope tally.Scope, messageStore me
}
}

// emitSignal sends a signal on OnSignal if set. Blocks until the signal is
// received, allowing tests to synchronize by controlling when signals are drained.
// Production code does not set OnSignal, so this is a no-op outside tests.
func (s *subscriber) emitSignal(sig HookSignal) {
if ch := s.OnSignal; ch != nil {
ch <- sig
}
}

// advanceWatermark advances offset_acked to the highest contiguous acked offset.
// All operations are idempotent — safe to call from multiple paths (Reject, retry-limit,
// poll loop) and safe to retry on failure.
Expand Down Expand Up @@ -476,6 +504,7 @@ func (s *subscriber) managePartitions(ctx context.Context, sub *subscription) {
if err := s.sendHeartbeat(ctx, sub); err != nil {
s.logger.Errorw("heartbeat failed during lease error recovery", append(logFields, "error", err)...)
}
s.emitSignal(SignalPartitionUpdate)
continue
}

Expand All @@ -490,11 +519,13 @@ func (s *subscriber) managePartitions(ctx context.Context, sub *subscription) {
if err := s.sendHeartbeat(ctx, sub); err != nil {
s.logger.Errorw("periodic heartbeat failed", append(logFields, "error", err)...)
}
s.emitSignal(SignalPartitionUpdate)

case <-discoveryTicker.C:
if err := s.discoverAndReconcileWorkers(ctx, sub); err != nil {
s.logger.Errorw("partition discovery failed, will retry on next tick", append(logFields, "error", err)...)
}
s.emitSignal(SignalPartitionUpdate)
}
}
}
Expand Down Expand Up @@ -705,6 +736,7 @@ func (w *partitionWorker) run(ctx context.Context) {
"error", err,
)
}
w.subscriber.emitSignal(SignalDeliveryCheck)
}
}
}
Expand Down
57 changes: 57 additions & 0 deletions test/integration/extension/queue/mysql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Queue MySQL Integration Tests

Integration tests for the SQL-backed queue (publisher, subscriber, partitioning, rebalance, DLQ, crash recovery).

## Infrastructure

Tests run against a real MySQL 8.0 instance via Docker Compose (`docker-compose.yml`). The `testutil.ComposeStack` helper manages the container lifecycle:

1. Starts MySQL on a random ephemeral port
2. Waits for the health check to pass
3. Connects and applies schemas from `extension/queue/mysql/schema/`
4. Tears down on test completion

All tests share a single MySQL instance within the suite (`SetupSuite` / `TearDownSuite`). Each test uses unique topic names to avoid cross-test interference.

## Running

```bash
make integration-test # all integration tests
bazel test //test/integration/extension/queue/mysql:mysql_test --test_output=streamed
```

Requires Docker.

## Event-driven waiting

Tests use **zero `time.Sleep` calls**. Instead, they use the subscriber's `OnSignal` hook — a single channel that emits typed `HookSignal` values after internal lifecycle events complete:

| Signal | Meaning |
|--------|---------|
| `SignalDeliveryCheck` | A partition was checked for deliverable messages (including watermark advancement) |
| `SignalPartitionUpdate` | Partition ownership was evaluated (discovery, rebalance, lease renewal) |

Signal names describe behavioral concerns, not implementation details, so they remain stable across internal refactors.

### Test helpers

| Helper | What it does |
|--------|--------------|
| `receiveWithTimeout` | Blocks on the delivery channel with a 10s safety-net timeout |
| `waitForSignal` | Drains stale signals, then blocks until a signal of the requested type arrives |
| `assertNoDelivery` | Waits for N signals of a given type, then asserts the delivery channel is empty |
| `waitForCondition` | Waits for signals until a condition function returns true (used for rebalance convergence) |

### Why not use defaults for `testSubConfig`?

`testSubConfig` overrides visibility timeout (2s), lease duration (3s), and lease renewal interval (1s). The production defaults are 60s, 30s, and 10s respectively. These control real DB timeouts that the subscriber must wait for — even with event-driven hooks, a message stays invisible until the DB timeout expires. Short values keep crash recovery tests under 5s instead of 90s.

## Test categories

- **Publish/subscribe basics** — ordering, metadata, partitioning, late subscribers, idempotency
- **Visibility and retry** — timeout expiry, `ExtendVisibilityTimeout`, nack with delay
- **Crash recovery** — worker crash with in-flight messages, reject + crash, retry-limit + crash
- **Consumer groups** — independent state, multiple workers in a group, load balancing
- **Rebalance** — even distribution, subscriber leave, odd partitions, excess subscribers
- **Watermark** — contiguous advancement with out-of-order acks
- **Admin CLI** — topic stats, consumer lag, leases, offsets, delete/purge, reset
Loading