Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions internal/autoretry/auto_retry_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,15 @@ type List[T any] struct {
readInFlight int
retryInFlight int
cond sync.Cond
readCtx context.Context
readDone func()
// retryNotify is a buffered (size 1) signal channel. It is sent on
// (non-blocking) whenever a new retry is appended to pendingRetry, so
// downstream readers that are blocked dispatching a fresh-read batch can
// preempt that dispatch and prefer the retry instead. Size 1 + non-blocking
// send means signals coalesce: a single observer reading a single value
// indicates "one or more retries became available since the last drain".
retryNotify chan struct{}
readCtx context.Context
readDone func()
}

// NewList returns a new list of Ts requiring automatic retries.
Expand All @@ -69,14 +76,34 @@ func NewList[T any](reader ReadFunc[T], mutator MutatorFunc[T]) *List[T] {
}
readCtx, readDone := context.WithCancel(context.Background())
return &List[T]{
cond: *sync.NewCond(&sync.Mutex{}),
reader: reader,
mutator: mutator,
readCtx: readCtx,
readDone: readDone,
cond: *sync.NewCond(&sync.Mutex{}),
reader: reader,
mutator: mutator,
retryNotify: make(chan struct{}, 1),
readCtx: readCtx,
readDone: readDone,
}
}

// RetryNotifyChan returns a channel that delivers a (coalesced) signal each
// time a new retry has been queued. Readers can race the channel-send of an
// in-flight fresh batch against this signal in a select to splice retries
// ahead of fresh batches that have not yet been delivered downstream. Reading
// from the channel only signals that one or more retries are pending — call
// TryShiftRetry to obtain one.
func (l *List[T]) RetryNotifyChan() <-chan struct{} {
return l.retryNotify
}

// TryShiftRetry returns a pending retry if one is immediately available,
// otherwise the zero T, a nil ack func, and false. Unlike Shift, this never
// dispatches a fresh read.
func (l *List[T]) TryShiftRetry(ctx context.Context) (t T, fn AckFunc, ok bool) {
l.cond.L.Lock()
defer l.cond.L.Unlock()
return l.tryShift(ctx)
}

// Adopt a T and its acknowledgement function so that a rejected T is added to
// retry list. Returns a new acknowledgment function that should be propagated
// as it encapsulates the retry logic.
Expand Down Expand Up @@ -112,6 +139,10 @@ func (l *List[T]) wrapPendingAck(t *pendingT[T]) AckFunc {
if err != nil {
t.t = l.mutator(t.t, err)
l.pendingRetry = append(l.pendingRetry, t)
select {
case l.retryNotify <- struct{}{}:
default:
}
return
}
l.retryInFlight--
Expand Down
106 changes: 106 additions & 0 deletions internal/autoretry/auto_retry_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,112 @@ import (

var errCustomEOF = errors.New("custom EOF")

// TestRetryListNotifyAndTryShift pins down the contract of RetryNotifyChan
// and TryShiftRetry: notify fires (coalesced) when a fresh retry is queued,
// never fires on a successful ack, and TryShiftRetry returns retries without
// ever dispatching a fresh read or blocking.
func TestRetryListNotifyAndTryShift(t *testing.T) {
tCtx, done := context.WithTimeout(t.Context(), time.Second)
defer done()

var readCalls int
data := []string{"foo", "bar"}
l := NewList(func(ctx context.Context) (t string, aFn AckFunc, err error) {
readCalls++
if len(data) == 0 {
err = errCustomEOF
return
}
t = data[0]
data = data[1:]
aFn = func(context.Context, error) error { return nil }
return
}, nil)

notify := l.RetryNotifyChan()

assertNoSignal := func() {
t.Helper()
select {
case <-notify:
t.Fatal("unexpected retry-notify signal")
default:
}
}
assertSignal := func() {
t.Helper()
select {
case <-notify:
case <-tCtx.Done():
t.Fatal("expected retry-notify signal, got timeout")
}
}

// Empty state: no signal, no retry, no fresh read dispatched.
assertNoSignal()
_, _, ok := l.TryShiftRetry(tCtx)
assert.False(t, ok, "TryShiftRetry returned a retry before any was queued")
assert.Equal(t, 0, readCalls, "TryShiftRetry must not dispatch a fresh read")

// Take "foo" off the underlying reader, then nack it. The notify channel
// must fire and the retry must be drainable via TryShiftRetry.
v, fooFn, err := l.Shift(tCtx, true)
require.NoError(t, err)
require.Equal(t, "foo", v)

require.NoError(t, fooFn(tCtx, errors.New("transient")))

assertSignal()
assertNoSignal() // single-shot — coalesced

got, fooRetryFn, ok := l.TryShiftRetry(tCtx)
require.True(t, ok, "TryShiftRetry returned no retry despite one being queued")
assert.Equal(t, "foo", got)

_, _, ok = l.TryShiftRetry(tCtx)
assert.False(t, ok, "queue must be empty after drain")

// Successfully ack the retried foo. That must NOT fire notify.
require.NoError(t, fooRetryFn(tCtx, nil))
assertNoSignal()

// Coalescing: queue two retries before the consumer drains the signal.
// Re-issue Shift for "bar" (the next fresh value), then nack foo (already
// drained) is impossible — instead nack bar, then nack a second freshly
// shifted value. We have only "bar" left in data, so use a single nack to
// confirm coalescing-with-an-already-pending-signal: queue a retry, then
// queue another via the same path while the channel still has a signal.
v, barFn, err := l.Shift(tCtx, true)
require.NoError(t, err)
require.Equal(t, "bar", v)
require.NoError(t, barFn(tCtx, errors.New("transient")))

// The second nack would attempt a non-blocking send into a buffered (size
// 1) channel that already has a value — verify the send doesn't block by
// using a separate retry path. Re-take bar, nack again.
got, barRetry1, ok := l.TryShiftRetry(tCtx)
require.True(t, ok)
require.Equal(t, "bar", got)
require.NoError(t, barRetry1(tCtx, errors.New("transient2")))

// Drain at least one signal (could be 1 or 2 depending on timing — the
// contract is "fires when one or more retries become available since last
// drain").
assertSignal()
// Drain remaining if any, then assert empty.
select {
case <-notify:
default:
}
assertNoSignal()

got, barRetry2, ok := l.TryShiftRetry(tCtx)
require.True(t, ok, "second nack of bar must be retrievable")
assert.Equal(t, "bar", got)
require.NoError(t, barRetry2(tCtx, nil))
assertNoSignal()
}

func TestRetryListAllAcks(t *testing.T) {
tCtx, done := context.WithTimeout(t.Context(), time.Second)
defer done()
Expand Down
133 changes: 99 additions & 34 deletions internal/component/input/async_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ type AsyncReader struct {
shutSig *shutdown.Signaller
}

// retryAware is an optional interface implemented by Async readers whose
// retry queue can be inspected. AsyncReader uses this (via type assertion) to
// splice queued retries ahead of in-flight fresh batches that have been read
// but not yet handed off to the downstream transactions channel. Preserving
// source ordering across retries matters when an output ack errors after the
// input has already read further records.
type retryAware interface {
// RetryNotifyChan returns a channel that fires (coalesced) when one or
// more retries become available since the last drain.
RetryNotifyChan() <-chan struct{}

// TryShiftRetry returns a pending retry if one is immediately available,
// otherwise (nil, nil, false). Non-blocking.
TryShiftRetry(ctx context.Context) (message.Batch, AsyncAckFn, bool)
}

// NewAsyncReader creates a new AsyncReader input type.
func NewAsyncReader(
typeStr string,
Expand Down Expand Up @@ -164,56 +180,104 @@ func (r *AsyncReader) loop() {
mConn.Incr(1)
r.connection.Store(component.ConnectionActive(r.mgr))

for {
msg, ackFn, err := r.reader.ReadBatch(closeAtLeisureCtx)
// If the underlying reader exposes its retry queue, race in-flight
// fresh-batch sends against the retry-available signal so retries can
// preempt fresh batches that have not yet been handed off downstream.
ra, _ := r.reader.(retryAware)
var retryNotify <-chan struct{}
if ra != nil {
retryNotify = ra.RetryNotifyChan()
}

// If our reader says it is not connected.
if errors.Is(err, component.ErrNotConnected) {
mLostConn.Incr(1)
r.connection.Store(component.ConnectionFailing(r.mgr, component.ErrNotConnected))
type deferredBatch struct {
msg message.Batch
ackFn AsyncAckFn
startedAt time.Time
}
var deferred []deferredBatch

// Continue to try to reconnect while still active.
if !initConnection() {
return
for {
var (
msg message.Batch
ackFn AsyncAckFn
startedAt time.Time
)

// 1. A queued retry takes priority over everything else.
if ra != nil {
if rmsg, raFn, ok := ra.TryShiftRetry(closeAtLeisureCtx); ok {
msg, ackFn = rmsg, raFn
startedAt = time.Now()
}
mConn.Incr(1)
r.connection.Store(component.ConnectionActive(r.mgr))
continue
}

// Close immediately if our reader is closed.
if r.shutSig.IsSoftStopSignalled() || errors.Is(err, component.ErrTypeClosed) {
return
// 2. Otherwise replay any fresh batch that was deferred when a retry
// preempted its push.
if msg == nil && len(deferred) > 0 {
d := deferred[0]
deferred = deferred[1:]
msg, ackFn, startedAt = d.msg, d.ackFn, d.startedAt
}

if err != nil || len(msg) == 0 {
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, component.ErrTimeout) && !errors.Is(err, component.ErrNotConnected) {
r.mgr.Logger().Error("Failed to read message: %v\n", err)
// 3. Otherwise read a fresh batch.
if msg == nil {
var err error
msg, ackFn, err = r.reader.ReadBatch(closeAtLeisureCtx)

// If our reader says it is not connected.
if errors.Is(err, component.ErrNotConnected) {
mLostConn.Incr(1)
r.connection.Store(component.ConnectionFailing(r.mgr, component.ErrNotConnected))

// Continue to try to reconnect while still active.
if !initConnection() {
return
}
mConn.Incr(1)
r.connection.Store(component.ConnectionActive(r.mgr))
continue
}

nextBoff := r.readBackoff.NextBackOff()
if nextBoff == backoff.Stop {
r.mgr.Logger().Error("Maximum number of read attempt retries has been met, gracefully terminating input %v", r.typeStr)
// Close immediately if our reader is closed.
if r.shutSig.IsSoftStopSignalled() || errors.Is(err, component.ErrTypeClosed) {
return
}
select {
case <-time.After(nextBoff):
case <-r.shutSig.SoftStopChan():
return

if err != nil || len(msg) == 0 {
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, component.ErrTimeout) && !errors.Is(err, component.ErrNotConnected) {
r.mgr.Logger().Error("Failed to read message: %v\n", err)
}

nextBoff := r.readBackoff.NextBackOff()
if nextBoff == backoff.Stop {
r.mgr.Logger().Error("Maximum number of read attempt retries has been met, gracefully terminating input %v", r.typeStr)
return
}
select {
case <-time.After(nextBoff):
case <-r.shutSig.SoftStopChan():
return
}
continue
}
continue
}

r.readBackoff.Reset()
mRcvd.Incr(int64(msg.Len()))
r.mgr.Logger().Trace("Consumed %v messages from '%v'.\n", msg.Len(), r.typeStr)
r.readBackoff.Reset()
mRcvd.Incr(int64(msg.Len()))
r.mgr.Logger().Trace("Consumed %v messages from '%v'.\n", msg.Len(), r.typeStr)

startedAt := time.Now()
startedAt = time.Now()
}

resChan := make(chan error, 1)
tracing.InitSpans(r.mgr.Tracer(), traceName, msg)
select {
case r.transactions <- message.NewTransaction(msg, resChan):
case <-retryNotify:
// A retry became available while we were waiting to hand this batch
// off — defer it and loop so the next iteration picks up the retry
// first.
deferred = append(deferred, deferredBatch{msg: msg, ackFn: ackFn, startedAt: startedAt})
continue
case <-r.shutSig.SoftStopChan():
return
}
Expand All @@ -223,6 +287,7 @@ func (r *AsyncReader) loop() {
m message.Batch,
aFn AsyncAckFn,
rChan chan error,
sAt time.Time,
) {
defer pendingAcks.Done()

Expand All @@ -235,13 +300,13 @@ func (r *AsyncReader) loop() {
return
}

mLatency.Timing(time.Since(startedAt).Nanoseconds())
mLatency.Timing(time.Since(sAt).Nanoseconds())
tracing.FinishSpans(m)

if err = aFn(closeNowCtx, res); err != nil {
if err := aFn(closeNowCtx, res); err != nil {
r.mgr.Logger().Error("Failed to acknowledge message: %v\n", err)
}
}(msg, ackFn, resChan)
}(msg, ackFn, resChan, startedAt)
}
}

Expand Down
Loading
Loading