Skip to content

Commit 159dc03

Browse files
committed
refactor(queue/mysql): eliminate all time-based waiting in integration tests
Replace 13 time.Sleep, 8 require.Eventually, and ad-hoc time.After assertions with event-driven hooks. Add OnSignal channel to subscriber with typed HookSignal values (SignalDeliveryCheck, SignalPartitionUpdate) named after behavioral concerns rather than implementation details. Test helpers (waitForSignal, assertNoDelivery, waitForCondition) use the signal channel with a 10s safety-net timeout. Zero sleeps remain in test bodies.
1 parent 7ea8ef3 commit 159dc03

4 files changed

Lines changed: 397 additions & 265 deletions

File tree

extension/queue/mysql/sql.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ type Params struct {
4242

4343
// MetricsScope for metrics collection (required)
4444
MetricsScope tally.Scope
45+
46+
// OnSignal receives typed subscriber lifecycle signals (HookSignal).
47+
// Nil in production; used by integration tests for event-driven waits.
48+
OnSignal chan HookSignal
4549
}
4650

4751
// NewQueue creates a new SQL-based queue
@@ -79,6 +83,7 @@ func NewQueue(params Params) (queue.Queue, error) {
7983
heartbeatStore,
8084
deliveryStateStore,
8185
)
86+
subscriber.OnSignal = params.OnSignal
8287

8388
return &queueImpl{
8489
publisher: publisher,

extension/queue/mysql/subscriber.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,21 @@ const (
5555
gcIdleTickInterval = 100
5656
)
5757

58+
// HookSignal identifies the type of subscriber lifecycle event.
59+
// Named after behavioral concerns (what happened) rather than implementation
60+
// details (which loop ran), so signal names remain stable across refactors.
61+
type HookSignal int
62+
63+
const (
64+
// SignalDeliveryCheck is sent after the subscriber checks a partition for
65+
// deliverable messages (including watermark advancement).
66+
SignalDeliveryCheck HookSignal = iota
67+
68+
// SignalPartitionUpdate is sent after the subscriber evaluates partition
69+
// ownership (discovery, rebalance, lease renewal, heartbeat).
70+
SignalPartitionUpdate
71+
)
72+
5873
type subscriber struct {
5974
logger *zap.SugaredLogger
6075
scope tally.Scope
@@ -69,6 +84,10 @@ type subscriber struct {
6984
// Active subscriptions
7085
subscriptions map[string]*subscription
7186
subMu sync.Mutex
87+
88+
// OnSignal receives typed lifecycle signals. Nil in production.
89+
// Consumers filter by signal type to wait for specific events.
90+
OnSignal chan HookSignal
7291
}
7392

7493
type subscription struct {
@@ -309,6 +328,15 @@ func NewSubscriber(logger *zap.SugaredLogger, scope tally.Scope, messageStore me
309328
}
310329
}
311330

331+
// emitSignal sends a signal on OnSignal if set. Blocks until the signal is
332+
// received, allowing tests to synchronize by controlling when signals are drained.
333+
// Production code does not set OnSignal, so this is a no-op outside tests.
334+
func (s *subscriber) emitSignal(sig HookSignal) {
335+
if ch := s.OnSignal; ch != nil {
336+
ch <- sig
337+
}
338+
}
339+
312340
// advanceWatermark advances offset_acked to the highest contiguous acked offset.
313341
// All operations are idempotent — safe to call from multiple paths (Reject, retry-limit,
314342
// poll loop) and safe to retry on failure.
@@ -476,6 +504,7 @@ func (s *subscriber) managePartitions(ctx context.Context, sub *subscription) {
476504
if err := s.sendHeartbeat(ctx, sub); err != nil {
477505
s.logger.Errorw("heartbeat failed during lease error recovery", append(logFields, "error", err)...)
478506
}
507+
s.emitSignal(SignalPartitionUpdate)
479508
continue
480509
}
481510

@@ -490,11 +519,13 @@ func (s *subscriber) managePartitions(ctx context.Context, sub *subscription) {
490519
if err := s.sendHeartbeat(ctx, sub); err != nil {
491520
s.logger.Errorw("periodic heartbeat failed", append(logFields, "error", err)...)
492521
}
522+
s.emitSignal(SignalPartitionUpdate)
493523

494524
case <-discoveryTicker.C:
495525
if err := s.discoverAndReconcileWorkers(ctx, sub); err != nil {
496526
s.logger.Errorw("partition discovery failed, will retry on next tick", append(logFields, "error", err)...)
497527
}
528+
s.emitSignal(SignalPartitionUpdate)
498529
}
499530
}
500531
}
@@ -705,6 +736,7 @@ func (w *partitionWorker) run(ctx context.Context) {
705736
"error", err,
706737
)
707738
}
739+
w.subscriber.emitSignal(SignalDeliveryCheck)
708740
}
709741
}
710742
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Queue MySQL Integration Tests
2+
3+
Integration tests for the SQL-backed queue (publisher, subscriber, partitioning, rebalance, DLQ, crash recovery).
4+
5+
## Infrastructure
6+
7+
Tests run against a real MySQL 8.0 instance via Docker Compose (`docker-compose.yml`). The `testutil.ComposeStack` helper manages the container lifecycle:
8+
9+
1. Starts MySQL on a random ephemeral port
10+
2. Waits for the health check to pass
11+
3. Connects and applies schemas from `extension/queue/mysql/schema/`
12+
4. Tears down on test completion
13+
14+
All tests share a single MySQL instance within the suite (`SetupSuite` / `TearDownSuite`). Each test uses unique topic names to avoid cross-test interference.
15+
16+
## Running
17+
18+
```bash
19+
make integration-test # all integration tests
20+
bazel test //test/integration/extension/queue/mysql:mysql_test --test_output=streamed
21+
```
22+
23+
Requires Docker.
24+
25+
## Event-driven waiting
26+
27+
Tests use **zero `time.Sleep` calls**. Instead, they use the subscriber's `OnSignal` hook — a single channel that emits typed `HookSignal` values after internal lifecycle events complete:
28+
29+
| Signal | Meaning |
30+
|--------|---------|
31+
| `SignalDeliveryCheck` | A partition was checked for deliverable messages (including watermark advancement) |
32+
| `SignalPartitionUpdate` | Partition ownership was evaluated (discovery, rebalance, lease renewal) |
33+
34+
Signal names describe behavioral concerns, not implementation details, so they remain stable across internal refactors.
35+
36+
### Test helpers
37+
38+
| Helper | What it does |
39+
|--------|--------------|
40+
| `receiveWithTimeout` | Blocks on the delivery channel with a 10s safety-net timeout |
41+
| `waitForSignal` | Drains stale signals, then blocks until a signal of the requested type arrives |
42+
| `assertNoDelivery` | Waits for N signals of a given type, then asserts the delivery channel is empty |
43+
| `waitForCondition` | Waits for signals until a condition function returns true (used for rebalance convergence) |
44+
45+
### Why not use defaults for `testSubConfig`?
46+
47+
`testSubConfig` overrides visibility timeout (2s), lease duration (3s), and lease renewal interval (1s). The production defaults are 60s, 30s, and 10s respectively. These control real DB timeouts that the subscriber must wait for — even with event-driven hooks, a message stays invisible until the DB timeout expires. Short values keep crash recovery tests under 5s instead of 90s.
48+
49+
## Test categories
50+
51+
- **Publish/subscribe basics** — ordering, metadata, partitioning, late subscribers, idempotency
52+
- **Visibility and retry** — timeout expiry, `ExtendVisibilityTimeout`, nack with delay
53+
- **Crash recovery** — worker crash with in-flight messages, reject + crash, retry-limit + crash
54+
- **Consumer groups** — independent state, multiple workers in a group, load balancing
55+
- **Rebalance** — even distribution, subscriber leave, odd partitions, excess subscribers
56+
- **Watermark** — contiguous advancement with out-of-order acks
57+
- **Admin CLI** — topic stats, consumer lag, leases, offsets, delete/purge, reset

0 commit comments

Comments
 (0)