Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions pkg/events/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type NATSBus struct {
durable string // durable consumer name prefix (per consuming service)

mu sync.Mutex
cancels []context.CancelFunc
stop chan struct{} // closed by Close to trigger early consumer drain
consumes []jetstream.ConsumeContext
}

Expand Down Expand Up @@ -100,7 +100,7 @@ func NewNATSBus(url, durableName string, workers int, m *metrics.Metrics, log *z
return nil, fmt.Errorf("jetstream init: %w", err)
}

b := &NATSBus{conn: conn, js: js, log: log, m: m, workers: workers, durable: durableName}
b := &NATSBus{conn: conn, js: js, log: log, m: m, workers: workers, durable: durableName, stop: make(chan struct{})}

// Ensure every known stream exists. CreateOrUpdateStream is idempotent, so
// every service can call this safely on boot regardless of start order.
Expand Down Expand Up @@ -277,18 +277,19 @@ func (b *NATSBus) startConsumers(ctx context.Context, handler Handler, topics ..
return fmt.Errorf("consume %s: %w", durable, err)
}

// Graceful shutdown: when ctx ends, stop pulling, drain the pool, then
// stop the consume context.
shutdownCtx, cancel := context.WithCancel(ctx)
// Graceful shutdown: when the subscribe ctx ends OR Close is called,
// stop pulling, drain the pool, then stop the consume context.
go func() {
<-shutdownCtx.Done()
select {
case <-ctx.Done():
case <-b.stop:
}
consume.Stop() // stop delivering new messages
close(work) // let workers drain remaining buffered messages
wg.Wait()
}()

b.mu.Lock()
b.cancels = append(b.cancels, cancel)
b.consumes = append(b.consumes, consume)
b.mu.Unlock()
}
Expand Down Expand Up @@ -356,16 +357,18 @@ func (b *NATSBus) process(ctx context.Context, topic string, handler Handler, ms
}
}

// Close stops all consumers and drains the NATS connection.
// Close stops all consumers and drains the NATS connection. It is safe to call
// once; the stop channel signals each consumer's drain goroutine to wind down.
func (b *NATSBus) Close() error {
b.mu.Lock()
for _, cancel := range b.cancels {
cancel()
select {
case <-b.stop: // already closed
default:
close(b.stop)
}
for _, c := range b.consumes {
c.Stop()
}
b.cancels = nil
b.consumes = nil
b.mu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion pkg/events/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestNATSPublishConsumeAck(t *testing.T) {
defer cancel()

gotTrace := make(chan trace.TraceID, 1)
handler := func(hctx context.Context, e Event) error {
handler := func(hctx context.Context, _ Event) error {
gotTrace <- trace.SpanContextFromContext(hctx).TraceID()
return nil
}
Expand Down
Loading