From fa66a8b8d39f3f47200659d83861aaaa75acd43d Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 8 May 2026 12:02:28 -0600 Subject: [PATCH] input: splice queued retries ahead of in-flight fresh batches When auto_replay_nacks is enabled, a nacked batch is supposed to be re-delivered to the pipeline before any subsequent fresh batch. The autoretry list's Shift already prioritises retries over fresh reads -- but AsyncReader's loop reads ahead: after pushing batch N to its unbuffered transactions channel, it spawns the ack-handler goroutine and immediately calls ReadBatch for batch N+1. By the time batch N errors and the ack-handler queues the retry, ReadBatch has already returned a fresh batch N+1 and pushed it onto the channel. The retry ends up scheduled behind the read-ahead. This breaks ordering for downstream consumers that rely on auto_replay_nacks for at-least-once delivery -- a partial output failure on a same-key partition surfaces as same-key reordering at the destination, even though every layer "preserves order" in isolation. Reproduced from connect#4387. Fix: race the channel send against a retry-available signal. When the signal wins, defer the in-flight fresh batch into a local queue and loop. The next iteration prefers (in order): 1. A queued retry (via TryShiftRetry, non-blocking). 2. A previously deferred fresh batch. 3. A new fresh read. Plumbed through: - autoretry.List grows a buffered (size 1, coalescing) retryNotify channel and a TryShiftRetry method. wrapPendingAck non-blocking sends on retryNotify whenever a retry is queued. - autoRetryInputBatched exposes both via unexported methods. - airGapBatchReader forwards them, gated on the underlying BatchInput satisfying the unexported retryAwareBatchInput interface -- so third-party BatchInput implementations are unaffected. - AsyncReader.loop type-asserts r.reader to an unexported retryAware interface (Go structural typing means no internal/public package coupling). Readers that don't implement it get the previous behaviour exactly. Tests: - TestRetryListNotifyAndTryShift pins the autoretry-level contract: notify fires on nack, doesn't fire on success, coalesces; TryShiftRetry returns false without dispatching a fresh read when the queue is empty. - TestBatchAutoRetryReadAheadOrdering is the integration regression: on the unfixed code it observes [A, B, A_retry] (read-ahead bug); with the fix it observes [A, A_retry, B]. Refs redpanda-data/connect#4387. --- internal/autoretry/auto_retry_list.go | 45 ++++- internal/autoretry/auto_retry_list_test.go | 106 ++++++++++++ internal/component/input/async_reader.go | 133 +++++++++++---- public/service/input.go | 43 +++++ public/service/input_auto_retry_batched.go | 18 ++ ...input_auto_retry_batched_readahead_test.go | 160 ++++++++++++++++++ 6 files changed, 464 insertions(+), 41 deletions(-) create mode 100644 public/service/input_auto_retry_batched_readahead_test.go diff --git a/internal/autoretry/auto_retry_list.go b/internal/autoretry/auto_retry_list.go index f15fe894e..794e2322c 100644 --- a/internal/autoretry/auto_retry_list.go +++ b/internal/autoretry/auto_retry_list.go @@ -58,8 +58,15 @@ type List[T any] struct { readInFlight int retryInFlight int cond sync.Cond - readCtx context.Context - readDone func() + // retryNotify is a buffered (size 1) signal channel. It is sent on + // (non-blocking) whenever a new retry is appended to pendingRetry, so + // downstream readers that are blocked dispatching a fresh-read batch can + // preempt that dispatch and prefer the retry instead. Size 1 + non-blocking + // send means signals coalesce: a single observer reading a single value + // indicates "one or more retries became available since the last drain". + retryNotify chan struct{} + readCtx context.Context + readDone func() } // NewList returns a new list of Ts requiring automatic retries. @@ -69,14 +76,34 @@ func NewList[T any](reader ReadFunc[T], mutator MutatorFunc[T]) *List[T] { } readCtx, readDone := context.WithCancel(context.Background()) return &List[T]{ - cond: *sync.NewCond(&sync.Mutex{}), - reader: reader, - mutator: mutator, - readCtx: readCtx, - readDone: readDone, + cond: *sync.NewCond(&sync.Mutex{}), + reader: reader, + mutator: mutator, + retryNotify: make(chan struct{}, 1), + readCtx: readCtx, + readDone: readDone, } } +// RetryNotifyChan returns a channel that delivers a (coalesced) signal each +// time a new retry has been queued. Readers can race the channel-send of an +// in-flight fresh batch against this signal in a select to splice retries +// ahead of fresh batches that have not yet been delivered downstream. Reading +// from the channel only signals that one or more retries are pending — call +// TryShiftRetry to obtain one. +func (l *List[T]) RetryNotifyChan() <-chan struct{} { + return l.retryNotify +} + +// TryShiftRetry returns a pending retry if one is immediately available, +// otherwise the zero T, a nil ack func, and false. Unlike Shift, this never +// dispatches a fresh read. +func (l *List[T]) TryShiftRetry(ctx context.Context) (t T, fn AckFunc, ok bool) { + l.cond.L.Lock() + defer l.cond.L.Unlock() + return l.tryShift(ctx) +} + // Adopt a T and its acknowledgement function so that a rejected T is added to // retry list. Returns a new acknowledgment function that should be propagated // as it encapsulates the retry logic. @@ -112,6 +139,10 @@ func (l *List[T]) wrapPendingAck(t *pendingT[T]) AckFunc { if err != nil { t.t = l.mutator(t.t, err) l.pendingRetry = append(l.pendingRetry, t) + select { + case l.retryNotify <- struct{}{}: + default: + } return } l.retryInFlight-- diff --git a/internal/autoretry/auto_retry_list_test.go b/internal/autoretry/auto_retry_list_test.go index f0187b527..b9858662e 100644 --- a/internal/autoretry/auto_retry_list_test.go +++ b/internal/autoretry/auto_retry_list_test.go @@ -15,6 +15,112 @@ import ( var errCustomEOF = errors.New("custom EOF") +// TestRetryListNotifyAndTryShift pins down the contract of RetryNotifyChan +// and TryShiftRetry: notify fires (coalesced) when a fresh retry is queued, +// never fires on a successful ack, and TryShiftRetry returns retries without +// ever dispatching a fresh read or blocking. +func TestRetryListNotifyAndTryShift(t *testing.T) { + tCtx, done := context.WithTimeout(t.Context(), time.Second) + defer done() + + var readCalls int + data := []string{"foo", "bar"} + l := NewList(func(ctx context.Context) (t string, aFn AckFunc, err error) { + readCalls++ + if len(data) == 0 { + err = errCustomEOF + return + } + t = data[0] + data = data[1:] + aFn = func(context.Context, error) error { return nil } + return + }, nil) + + notify := l.RetryNotifyChan() + + assertNoSignal := func() { + t.Helper() + select { + case <-notify: + t.Fatal("unexpected retry-notify signal") + default: + } + } + assertSignal := func() { + t.Helper() + select { + case <-notify: + case <-tCtx.Done(): + t.Fatal("expected retry-notify signal, got timeout") + } + } + + // Empty state: no signal, no retry, no fresh read dispatched. + assertNoSignal() + _, _, ok := l.TryShiftRetry(tCtx) + assert.False(t, ok, "TryShiftRetry returned a retry before any was queued") + assert.Equal(t, 0, readCalls, "TryShiftRetry must not dispatch a fresh read") + + // Take "foo" off the underlying reader, then nack it. The notify channel + // must fire and the retry must be drainable via TryShiftRetry. + v, fooFn, err := l.Shift(tCtx, true) + require.NoError(t, err) + require.Equal(t, "foo", v) + + require.NoError(t, fooFn(tCtx, errors.New("transient"))) + + assertSignal() + assertNoSignal() // single-shot — coalesced + + got, fooRetryFn, ok := l.TryShiftRetry(tCtx) + require.True(t, ok, "TryShiftRetry returned no retry despite one being queued") + assert.Equal(t, "foo", got) + + _, _, ok = l.TryShiftRetry(tCtx) + assert.False(t, ok, "queue must be empty after drain") + + // Successfully ack the retried foo. That must NOT fire notify. + require.NoError(t, fooRetryFn(tCtx, nil)) + assertNoSignal() + + // Coalescing: queue two retries before the consumer drains the signal. + // Re-issue Shift for "bar" (the next fresh value), then nack foo (already + // drained) is impossible — instead nack bar, then nack a second freshly + // shifted value. We have only "bar" left in data, so use a single nack to + // confirm coalescing-with-an-already-pending-signal: queue a retry, then + // queue another via the same path while the channel still has a signal. + v, barFn, err := l.Shift(tCtx, true) + require.NoError(t, err) + require.Equal(t, "bar", v) + require.NoError(t, barFn(tCtx, errors.New("transient"))) + + // The second nack would attempt a non-blocking send into a buffered (size + // 1) channel that already has a value — verify the send doesn't block by + // using a separate retry path. Re-take bar, nack again. + got, barRetry1, ok := l.TryShiftRetry(tCtx) + require.True(t, ok) + require.Equal(t, "bar", got) + require.NoError(t, barRetry1(tCtx, errors.New("transient2"))) + + // Drain at least one signal (could be 1 or 2 depending on timing — the + // contract is "fires when one or more retries become available since last + // drain"). + assertSignal() + // Drain remaining if any, then assert empty. + select { + case <-notify: + default: + } + assertNoSignal() + + got, barRetry2, ok := l.TryShiftRetry(tCtx) + require.True(t, ok, "second nack of bar must be retrievable") + assert.Equal(t, "bar", got) + require.NoError(t, barRetry2(tCtx, nil)) + assertNoSignal() +} + func TestRetryListAllAcks(t *testing.T) { tCtx, done := context.WithTimeout(t.Context(), time.Second) defer done() diff --git a/internal/component/input/async_reader.go b/internal/component/input/async_reader.go index 4a56cb2ef..19f08f6d2 100644 --- a/internal/component/input/async_reader.go +++ b/internal/component/input/async_reader.go @@ -35,6 +35,22 @@ type AsyncReader struct { shutSig *shutdown.Signaller } +// retryAware is an optional interface implemented by Async readers whose +// retry queue can be inspected. AsyncReader uses this (via type assertion) to +// splice queued retries ahead of in-flight fresh batches that have been read +// but not yet handed off to the downstream transactions channel. Preserving +// source ordering across retries matters when an output ack errors after the +// input has already read further records. +type retryAware interface { + // RetryNotifyChan returns a channel that fires (coalesced) when one or + // more retries become available since the last drain. + RetryNotifyChan() <-chan struct{} + + // TryShiftRetry returns a pending retry if one is immediately available, + // otherwise (nil, nil, false). Non-blocking. + TryShiftRetry(ctx context.Context) (message.Batch, AsyncAckFn, bool) +} + // NewAsyncReader creates a new AsyncReader input type. func NewAsyncReader( typeStr string, @@ -164,56 +180,104 @@ func (r *AsyncReader) loop() { mConn.Incr(1) r.connection.Store(component.ConnectionActive(r.mgr)) - for { - msg, ackFn, err := r.reader.ReadBatch(closeAtLeisureCtx) + // If the underlying reader exposes its retry queue, race in-flight + // fresh-batch sends against the retry-available signal so retries can + // preempt fresh batches that have not yet been handed off downstream. + ra, _ := r.reader.(retryAware) + var retryNotify <-chan struct{} + if ra != nil { + retryNotify = ra.RetryNotifyChan() + } - // If our reader says it is not connected. - if errors.Is(err, component.ErrNotConnected) { - mLostConn.Incr(1) - r.connection.Store(component.ConnectionFailing(r.mgr, component.ErrNotConnected)) + type deferredBatch struct { + msg message.Batch + ackFn AsyncAckFn + startedAt time.Time + } + var deferred []deferredBatch - // Continue to try to reconnect while still active. - if !initConnection() { - return + for { + var ( + msg message.Batch + ackFn AsyncAckFn + startedAt time.Time + ) + + // 1. A queued retry takes priority over everything else. + if ra != nil { + if rmsg, raFn, ok := ra.TryShiftRetry(closeAtLeisureCtx); ok { + msg, ackFn = rmsg, raFn + startedAt = time.Now() } - mConn.Incr(1) - r.connection.Store(component.ConnectionActive(r.mgr)) - continue } - // Close immediately if our reader is closed. - if r.shutSig.IsSoftStopSignalled() || errors.Is(err, component.ErrTypeClosed) { - return + // 2. Otherwise replay any fresh batch that was deferred when a retry + // preempted its push. + if msg == nil && len(deferred) > 0 { + d := deferred[0] + deferred = deferred[1:] + msg, ackFn, startedAt = d.msg, d.ackFn, d.startedAt } - if err != nil || len(msg) == 0 { - if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, component.ErrTimeout) && !errors.Is(err, component.ErrNotConnected) { - r.mgr.Logger().Error("Failed to read message: %v\n", err) + // 3. Otherwise read a fresh batch. + if msg == nil { + var err error + msg, ackFn, err = r.reader.ReadBatch(closeAtLeisureCtx) + + // If our reader says it is not connected. + if errors.Is(err, component.ErrNotConnected) { + mLostConn.Incr(1) + r.connection.Store(component.ConnectionFailing(r.mgr, component.ErrNotConnected)) + + // Continue to try to reconnect while still active. + if !initConnection() { + return + } + mConn.Incr(1) + r.connection.Store(component.ConnectionActive(r.mgr)) + continue } - nextBoff := r.readBackoff.NextBackOff() - if nextBoff == backoff.Stop { - r.mgr.Logger().Error("Maximum number of read attempt retries has been met, gracefully terminating input %v", r.typeStr) + // Close immediately if our reader is closed. + if r.shutSig.IsSoftStopSignalled() || errors.Is(err, component.ErrTypeClosed) { return } - select { - case <-time.After(nextBoff): - case <-r.shutSig.SoftStopChan(): - return + + if err != nil || len(msg) == 0 { + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, component.ErrTimeout) && !errors.Is(err, component.ErrNotConnected) { + r.mgr.Logger().Error("Failed to read message: %v\n", err) + } + + nextBoff := r.readBackoff.NextBackOff() + if nextBoff == backoff.Stop { + r.mgr.Logger().Error("Maximum number of read attempt retries has been met, gracefully terminating input %v", r.typeStr) + return + } + select { + case <-time.After(nextBoff): + case <-r.shutSig.SoftStopChan(): + return + } + continue } - continue - } - r.readBackoff.Reset() - mRcvd.Incr(int64(msg.Len())) - r.mgr.Logger().Trace("Consumed %v messages from '%v'.\n", msg.Len(), r.typeStr) + r.readBackoff.Reset() + mRcvd.Incr(int64(msg.Len())) + r.mgr.Logger().Trace("Consumed %v messages from '%v'.\n", msg.Len(), r.typeStr) - startedAt := time.Now() + startedAt = time.Now() + } resChan := make(chan error, 1) tracing.InitSpans(r.mgr.Tracer(), traceName, msg) select { case r.transactions <- message.NewTransaction(msg, resChan): + case <-retryNotify: + // A retry became available while we were waiting to hand this batch + // off — defer it and loop so the next iteration picks up the retry + // first. + deferred = append(deferred, deferredBatch{msg: msg, ackFn: ackFn, startedAt: startedAt}) + continue case <-r.shutSig.SoftStopChan(): return } @@ -223,6 +287,7 @@ func (r *AsyncReader) loop() { m message.Batch, aFn AsyncAckFn, rChan chan error, + sAt time.Time, ) { defer pendingAcks.Done() @@ -235,13 +300,13 @@ func (r *AsyncReader) loop() { return } - mLatency.Timing(time.Since(startedAt).Nanoseconds()) + mLatency.Timing(time.Since(sAt).Nanoseconds()) tracing.FinishSpans(m) - if err = aFn(closeNowCtx, res); err != nil { + if err := aFn(closeNowCtx, res); err != nil { r.mgr.Logger().Error("Failed to acknowledge message: %v\n", err) } - }(msg, ackFn, resChan) + }(msg, ackFn, resChan, startedAt) } } diff --git a/public/service/input.go b/public/service/input.go index 5589e86a1..2e532a34d 100644 --- a/public/service/input.go +++ b/public/service/input.go @@ -182,6 +182,49 @@ func (a *airGapBatchReader) Close(ctx context.Context) error { return publicToInternalErr(a.r.Close(ctx)) } +// retryAwareBatchInput is satisfied by AutoRetryNacksBatched. We type-assert +// against the unexported method names so only the in-tree autoretry wrapper +// participates — third-party BatchInput implementations are unaffected. +type retryAwareBatchInput interface { + retryNotifyChan() <-chan struct{} + tryShiftRetry(ctx context.Context) (MessageBatch, AckFunc, bool) +} + +// RetryNotifyChan forwards the retry-available signal from the underlying +// BatchInput when it is the autoretry wrapper. AsyncReader uses this to +// splice retries ahead of in-flight fresh batches. Returns nil when the +// underlying input is not retry-aware (no preemption then; the previous +// pre-fix behaviour is preserved). +func (a *airGapBatchReader) RetryNotifyChan() <-chan struct{} { + rar, ok := a.r.(retryAwareBatchInput) + if !ok { + return nil + } + return rar.retryNotifyChan() +} + +// TryShiftRetry forwards a non-blocking retry shift to the underlying +// BatchInput when it is the autoretry wrapper. Returns (nil, nil, false) when +// the underlying input is not retry-aware or no retry is currently queued. +func (a *airGapBatchReader) TryShiftRetry(ctx context.Context) (message.Batch, input.AsyncAckFn, bool) { + rar, ok := a.r.(retryAwareBatchInput) + if !ok { + return nil, nil, false + } + batch, ackFn, ok := rar.tryShiftRetry(ctx) + if !ok { + return nil, nil, false + } + mBatch := make(message.Batch, len(batch)) + for i, p := range batch { + mBatch[i] = p.part + } + return mBatch, func(c context.Context, r error) error { + r = toPublicBatchError(r) + return ackFn(c, r) + }, true +} + //------------------------------------------------------------------------------ // ResourceInput provides access to an input resource. diff --git a/public/service/input_auto_retry_batched.go b/public/service/input_auto_retry_batched.go index 8beed8888..a332fb214 100644 --- a/public/service/input_auto_retry_batched.go +++ b/public/service/input_auto_retry_batched.go @@ -146,3 +146,21 @@ func (i *autoRetryInputBatched) Close(ctx context.Context) error { _ = i.retryList.Close(ctx) return i.child.Close(ctx) } + +// retryNotifyChan exposes the underlying retry-list's notification channel so +// that downstream readers (e.g. AsyncReader) can splice queued retries ahead +// of in-flight fresh batches. See autoretry.List.RetryNotifyChan. +func (i *autoRetryInputBatched) retryNotifyChan() <-chan struct{} { + return i.retryList.RetryNotifyChan() +} + +// tryShiftRetry returns a pending retry batch if one is immediately +// available, otherwise (nil, nil, false). Non-blocking; never dispatches a +// fresh read. +func (i *autoRetryInputBatched) tryShiftRetry(ctx context.Context) (MessageBatch, AckFunc, bool) { + t, fn, ok := i.retryList.TryShiftRetry(ctx) + if !ok { + return nil, nil, false + } + return t.Copy(), AckFunc(fn), true +} diff --git a/public/service/input_auto_retry_batched_readahead_test.go b/public/service/input_auto_retry_batched_readahead_test.go new file mode 100644 index 000000000..b4865614f --- /dev/null +++ b/public/service/input_auto_retry_batched_readahead_test.go @@ -0,0 +1,160 @@ +// Copyright 2026 Redpanda Data, Inc. + +package service + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/internal/component/input" + "github.com/redpanda-data/benthos/v4/internal/manager/mock" + "github.com/redpanda-data/benthos/v4/internal/message" +) + +// TestBatchAutoRetryReadAheadOrdering is a regression test for the read-ahead +// reordering that occurs when an AsyncReader-wrapped batch input with +// auto_replay_nacks is paired with a downstream that processes batches +// sequentially. The expected behavior is that when a batch is nacked, its +// retry is presented to the downstream BEFORE any batch the AsyncReader has +// already read but not yet pushed past the channel pipeline. The current code +// fails this — AsyncReader pushes a fresh-read batch into the unbuffered +// transactions channel without checking for an incoming retry, so the retry +// of the nacked batch ends up scheduled behind the fresh read. +// +// Scenario: +// +// 1. Mock batch input yields batches A, then B, then blocks. +// 2. AsyncReader iter 1 reads A, pushes to transactions channel. +// 3. Test pulls A. AsyncReader iter 2 reads B, blocks pushing to channel. +// 4. Test acks A with an error → autoretry queues A for retry. +// 5. Test pulls next from transactions channel. +// +// With the current bug, step 5 produces B (the read-ahead won the race against +// the retry-queue signal). With the splicing fix, step 5 produces A (the +// retry preempts the in-flight fresh push). +func TestBatchAutoRetryReadAheadOrdering(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + + readerImpl := newMockBatchInput() + readerImpl.msgsToSnd = []MessageBatch{ + {NewMessage([]byte("A"))}, + {NewMessage([]byte("B"))}, + } + + // Wrap with autoretry, then bridge to internal Async, then AsyncReader. + pres := AutoRetryNacksBatched(readerImpl) + nm := mock.NewManager() + rdr := newAirGapBatchReader(nm, pres) + asyncReader, err := input.NewAsyncReader("test", rdr, nm) + require.NoError(t, err) + + asyncReader.TriggerStartConsuming() + + // Drive the mock: connect, then yield A, then yield B. Subsequent + // ReadBatch calls block on readChan because we never send again. + go func() { + select { + case readerImpl.connChan <- nil: + case <-ctx.Done(): + return + } + select { + case readerImpl.readChan <- nil: // returns batch A + case <-ctx.Done(): + return + } + select { + case readerImpl.readChan <- nil: // returns batch B + case <-ctx.Done(): + return + } + }() + + // Drain the mock's ack channel in a goroutine: the underlying ackFn for a + // batch is only invoked by autoretry when the batch is finally acked + // successfully (via wrapPendingAck with err == nil). Each successful ack + // blocks on this chan, so we keep it drained. + t.Cleanup(func() { go drainAckChan(readerImpl.ackChan) }) + t.Cleanup(func() { go drainCloseChan(readerImpl.closeChan) }) + t.Cleanup(asyncReader.TriggerCloseNow) + + txnCh := asyncReader.TransactionChan() + + // Pull batch A. + tranA := pullTxn(t, ctx, txnCh) + require.Equal(t, "A", payloadString(t, tranA), "first batch should be A") + + // Wait briefly for AsyncReader to advance into iter 2 and block pushing + // batch B onto the unbuffered transactions channel. This is the timing + // window where the read-ahead reordering becomes observable. + time.Sleep(200 * time.Millisecond) + + // Ack A with an error → autoretry queues A for retry. + require.NoError(t, tranA.Ack(ctx, errors.New("transient"))) + + // Give autoretry's wrapPendingAck a moment to run and append A to its + // pendingRetry list. + time.Sleep(200 * time.Millisecond) + + // Pull the next batch. Per the contract that a queued retry should + // preempt an in-flight fresh batch, this must be A's retry, not B. + tranNext := pullTxn(t, ctx, txnCh) + got := payloadString(t, tranNext) + + assert.Equal(t, "A", got, + "expected A's retry to be delivered before fresh B; got %q "+ + "(read-ahead reordering: AsyncReader pushed batch B onto the unbuffered "+ + "transactions channel before the retry of A was queued in autoretry, "+ + "so the retry is now scheduled BEHIND B)", got) + + // Ack the retry success so autoretry releases A's underlying ack and the + // stream can drain. + require.NoError(t, tranNext.Ack(ctx, nil)) + + // Pull and ack B so we exit cleanly. + tranB := pullTxn(t, ctx, txnCh) + assert.Equal(t, "B", payloadString(t, tranB), "third delivery should be B") + require.NoError(t, tranB.Ack(ctx, nil)) +} + +func pullTxn(t *testing.T, ctx context.Context, ch <-chan message.Transaction) message.Transaction { + t.Helper() + select { + case tran := <-ch: + return tran + case <-ctx.Done(): + t.Fatalf("timed out pulling next transaction: %v", ctx.Err()) + return message.Transaction{} + } +} + +func payloadString(t *testing.T, tran message.Transaction) string { + t.Helper() + require.Equal(t, 1, tran.Payload.Len(), "expected single-record batch") + return string(tran.Payload[0].AsBytes()) +} + +func drainAckChan(ch chan error) { + for { + select { + case ch <- nil: + case <-time.After(2 * time.Second): + return + } + } +} + +func drainCloseChan(ch chan error) { + select { + case ch <- nil: + case <-time.After(2 * time.Second): + } +}