From 489373ffdd87356a12f91f9f1d6179da681a15a7 Mon Sep 17 00:00:00 2001 From: AlpNuhoglu Date: Tue, 16 Jun 2026 17:29:50 +0300 Subject: [PATCH] fix(nats): resolve golangci-lint failures (gosec G118, revive) - Fix gosec G118 (potential context leak) in `nats.go`: Removed the derived context and per-consumer `CancelFunc`. Consumer drain goroutines now listen on a shared `b.stop` channel that is closed during `Close()`. This provides cleaner, idempotent-safe early shutdowns without tracking individual cancellations. - Fix revive unused-parameter in `nats_test.go`: Renamed unused `Event` parameter to `_`. --- pkg/events/nats.go | 25 ++++++++++++++----------- pkg/events/nats_test.go | 2 +- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/pkg/events/nats.go b/pkg/events/nats.go index 3d3c000..53442d4 100644 --- a/pkg/events/nats.go +++ b/pkg/events/nats.go @@ -48,7 +48,7 @@ type NATSBus struct { durable string // durable consumer name prefix (per consuming service) mu sync.Mutex - cancels []context.CancelFunc + stop chan struct{} // closed by Close to trigger early consumer drain consumes []jetstream.ConsumeContext } @@ -100,7 +100,7 @@ func NewNATSBus(url, durableName string, workers int, m *metrics.Metrics, log *z return nil, fmt.Errorf("jetstream init: %w", err) } - b := &NATSBus{conn: conn, js: js, log: log, m: m, workers: workers, durable: durableName} + b := &NATSBus{conn: conn, js: js, log: log, m: m, workers: workers, durable: durableName, stop: make(chan struct{})} // Ensure every known stream exists. CreateOrUpdateStream is idempotent, so // every service can call this safely on boot regardless of start order. @@ -277,18 +277,19 @@ func (b *NATSBus) startConsumers(ctx context.Context, handler Handler, topics .. return fmt.Errorf("consume %s: %w", durable, err) } - // Graceful shutdown: when ctx ends, stop pulling, drain the pool, then - // stop the consume context. - shutdownCtx, cancel := context.WithCancel(ctx) + // Graceful shutdown: when the subscribe ctx ends OR Close is called, + // stop pulling, drain the pool, then stop the consume context. go func() { - <-shutdownCtx.Done() + select { + case <-ctx.Done(): + case <-b.stop: + } consume.Stop() // stop delivering new messages close(work) // let workers drain remaining buffered messages wg.Wait() }() b.mu.Lock() - b.cancels = append(b.cancels, cancel) b.consumes = append(b.consumes, consume) b.mu.Unlock() } @@ -356,16 +357,18 @@ func (b *NATSBus) process(ctx context.Context, topic string, handler Handler, ms } } -// Close stops all consumers and drains the NATS connection. +// Close stops all consumers and drains the NATS connection. It is safe to call +// once; the stop channel signals each consumer's drain goroutine to wind down. func (b *NATSBus) Close() error { b.mu.Lock() - for _, cancel := range b.cancels { - cancel() + select { + case <-b.stop: // already closed + default: + close(b.stop) } for _, c := range b.consumes { c.Stop() } - b.cancels = nil b.consumes = nil b.mu.Unlock() diff --git a/pkg/events/nats_test.go b/pkg/events/nats_test.go index 0b187cf..5ce9edd 100644 --- a/pkg/events/nats_test.go +++ b/pkg/events/nats_test.go @@ -49,7 +49,7 @@ func TestNATSPublishConsumeAck(t *testing.T) { defer cancel() gotTrace := make(chan trace.TraceID, 1) - handler := func(hctx context.Context, e Event) error { + handler := func(hctx context.Context, _ Event) error { gotTrace <- trace.SpanContextFromContext(hctx).TraceID() return nil }