Skip to content

Commit 03ef128

Browse files
sbalabanov-zzclaude
andcommitted
feat(dlq): per-topic DLQ reconciliation + ErrorProcessor split
Adds a dlq controller package that drains every primary pipeline topic's {topic}_dlq companion and transitions the affected request or batch to a terminal failed state, so requests cannot be stuck in non-terminal states after the primary controller exhausts its retry budget. Splits error-classification policy from consumer transport via a new errs.ErrorProcessor interface with two implementations: NewClassifierProcessor for the primary pipeline (framework-wrap short-circuit + classifier walk) and AlwaysRetryableProcessor for DLQ consumers (every failure redelivered, paired with high MaxAttempts and DLQ.Enabled=false to avoid a _dlq_dlq cascade). Documentation updates cover the new package design, the processor choices, and the per-stage DLQ companions in the workflow RFC. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent f17c76f commit 03ef128

30 files changed

Lines changed: 2184 additions & 299 deletions

File tree

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,3 +304,4 @@ Errors are classified by origin (user vs infra) and retryability. The framework
304304
3. **Extensions return plain errors** — extension interfaces (`MergeChecker`, `Storage`, `Publisher`) return standard `error` values with their own domain sentinels (e.g. `storage.ErrNotFound`). They do NOT classify errors as user or infra.
305305
4. **Controllers classify errors** — the service controller that calls an extension decides whether the failure is user-caused or infrastructure-caused. The same extension error may be classified differently depending on context.
306306
5. **Error chain works end-to-end** — extensions wrap custom errors, controllers wrap with `errs.New*Error`, and `errors.Is`/`errors.As` walks the full chain.
307+
6. **Default classifiers** — primary pipeline consumers compose one or more classifiers (each owning a focused concern such as transport-level signals or a specific driver/library's errors) into `errs.NewClassifierProcessor(...)`. Pick classifiers that match the failure surfaces the consumer actually touches; add a new classifier package when a backend introduces error shapes that no existing one understands, rather than teaching an unrelated classifier about them. DLQ reconciliation consumers use `errs.AlwaysRetryableProcessor` instead so any failure is redelivered rather than dropped.

core/errs/BUILD.bazel

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,20 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "errs",
5-
srcs = ["errs.go"],
5+
srcs = [
6+
"errs.go",
7+
"processor.go",
8+
],
69
importpath = "github.com/uber/submitqueue/core/errs",
710
visibility = ["//visibility:public"],
811
)
912

1013
go_test(
1114
name = "errs_test",
12-
srcs = ["errs_test.go"],
15+
srcs = [
16+
"errs_test.go",
17+
"processor_test.go",
18+
],
1319
embed = [":errs"],
1420
deps = [
1521
"@com_github_stretchr_testify//assert",

core/errs/README.md

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ Errors are classified along two axes:
2323
A returned `error` reaches `IsUserError` / `IsRetryable` / `IsDependencyError` carrying one of the framework types (`*userError` / `*infraError`). It gets there one of two ways:
2424

2525
1. **Explicit wrap by the controller** — the controller knows the meaning of the failure and wraps the cause with `NewUserError`, `NewRetryableError`, `NewDependencyError`, or `NewRetryableDependencyError` before returning.
26-
2. **Automatic wrap by `Classify`** — the controller returns a raw driver/library/sentinel error, and a per-backend `Classifier` recognises it later in the pipeline (typically inside the consumer) and adds the appropriate framework wrap.
26+
2. **Automatic wrap by the classifier-based `ErrorProcessor`** — the controller returns a raw driver/library/sentinel error, and a per-backend `Classifier` recognises it later in the pipeline (typically inside the consumer, after `ErrorProcessor.Process` runs) and adds the appropriate framework wrap.
2727

2828
Both routes feed the same downstream helpers; the chain that reaches `IsRetryable` looks identical regardless of who wrapped it.
2929

30-
## `Classify` and the `Classifier` Interface
30+
## `ErrorProcessor`, `Classifier`, and the Processing Pass
3131

3232
`Classifier` inspects a **single error node** and returns a `Verdict`:
3333

@@ -39,14 +39,22 @@ type Classifier interface {
3939

4040
Verdicts: `Unknown` (this node carries no signal), `User`, `Infra`, `InfraRetryable`, `InfraDependency`, `InfraDependencyRetryable`.
4141

42-
`Classify(err, classifiers...)` is the single, explicit pass that turns a raw chain into a wrapped one. It is called **exactly once per chain** — typically by the consumer immediately after the controller returns. After that point, callers use only the `IsXxx` helpers, which are pure type checks.
42+
An `ErrorProcessor` runs the per-chain pass that turns a raw chain into a wrapped one. It is called **exactly once per chain** — typically by the consumer immediately after the controller returns. After that point, callers use only the `IsXxx` helpers, which are pure type checks.
4343

44-
`Classify` walks the chain twice:
44+
Two implementations ship in this package:
4545

46-
1. **Pass 1 — framework-wrap check.** A cheap type switch looks for an existing `*userError` / `*infraError` anywhere in the chain. If found, the chain is already interpretable and `Classify` returns `err` unchanged. **No classifier is invoked.**
47-
2. **Pass 2 — classifier walk.** From outermost to innermost node, each registered classifier is asked for a verdict. The first non-`Unknown` verdict wins and `err` is wrapped with the matching framework constructor.
46+
- **`NewClassifierProcessor(classifiers...)`** — the standard pass for primary pipeline consumers. Walks the chain twice:
47+
1. **Pass 1 — framework-wrap check.** A cheap type switch looks for an existing `*userError` / `*infraError` anywhere in the chain. If found, the chain is already interpretable and the processor returns `err` unchanged. **No classifier is invoked.**
48+
2. **Pass 2 — classifier walk.** From outermost to innermost node, each registered classifier is asked for a verdict. The first non-`Unknown` verdict wins and `err` is wrapped with the matching framework constructor.
4849

49-
If no classifier recognises anything, `err` is returned unchanged — and behaves as non-retryable infra at the helper layer.
50+
If no classifier recognises anything, `err` is returned unchanged — and behaves as non-retryable infra at the helper layer.
51+
52+
- **`AlwaysRetryableProcessor`** — unconditionally wraps every non-nil error with `NewRetryableError`, overriding any inner framework wrap. Use it for narrowly-scoped consumers — typically DLQ reconciliation — that must redeliver on any failure because there is no further dead-letter destination. Side-effect: an inner `*infraError(dependency=true)` is masked by the outer `retryable=true` wrap, since `errors.As` matches the outermost `*infraError` first. This is acceptable for the intended DLQ use case where only `IsRetryable` drives transport behaviour; do not pair this processor with a primary pipeline consumer or genuine user errors will retry forever instead of reaching their DLQ.
53+
54+
### Choosing a processor
55+
56+
- **Primary pipeline consumer**`NewClassifierProcessor(...)`. Controllers' explicit `NewUserError` / `NewDependencyError` wraps must survive so user errors don't get retried, and unclassified backend errors must be inspected by the registered classifiers.
57+
- **DLQ reconciliation consumer**`AlwaysRetryableProcessor`. The DLQ is the last stop; any unprocessable message must come back for another attempt rather than silently drop. The DLQ subscription itself runs with a very high `Retry.MaxAttempts` and with its own DLQ disabled, so "always retryable + bounded-but-effectively-infinite attempts" is the convergence guarantee.
5058

5159
## Adding a Backend-Specific Classifier
5260

@@ -77,21 +85,24 @@ func (classifier) Classify(err error) errs.Verdict {
7785
}
7886
```
7987

80-
Servers wire each classifier into the consumer as a vararg. Order matters only when two classifiers might both match a node — earlier classifiers win:
88+
Servers wire each classifier into the consumer's `ErrorProcessor`. Order matters only when two classifiers might both match a node — earlier classifiers win:
8189

8290
```go
8391
import (
92+
"github.com/uber/submitqueue/core/errs"
8493
genericerrs "github.com/uber/submitqueue/core/errs/generic"
8594
mysqlerrs "github.com/uber/submitqueue/core/errs/mysql"
8695
)
8796

8897
c := consumer.New(logger, scope, registry,
89-
genericerrs.Classifier,
90-
mysqlerrs.Classifier,
98+
errs.NewClassifierProcessor(
99+
genericerrs.Classifier,
100+
mysqlerrs.Classifier,
101+
),
91102
)
92103
```
93104

94-
Tests follow the same shape: assert per-node behaviour against `Classifier.Classify(node)` directly, and assert end-to-end behaviour by running `errs.Classify(err, Classifier)` and checking the helpers (`IsRetryable`, `IsUserError`, …) on the result. See `core/errs/mysql/mysql_test.go` and `core/errs/generic/generic_test.go`.
105+
Tests follow the same shape: assert per-node behaviour against `Classifier.Classify(node)` directly, and assert end-to-end behaviour by running `errs.NewClassifierProcessor(Classifier).Process(err)` and checking the helpers (`IsRetryable`, `IsUserError`, …) on the result. See `core/errs/mysql/mysql_test.go` and `core/errs/generic/generic_test.go`.
95106

96107
## Overriding Classification from a Controller
97108

@@ -106,8 +117,9 @@ if errors.Is(err, storage.ErrNotFound) {
106117
return errs.NewUserError(fmt.Errorf("request %s: %w", id, err))
107118
}
108119
if err != nil {
109-
// Hand the raw error to Classify — the mysql classifier will recognise
110-
// deadlocks, lock-wait timeouts, etc. and wrap them as retryable infra.
120+
// Hand the raw error to the consumer's ErrorProcessor — the mysql
121+
// classifier will recognise deadlocks, lock-wait timeouts, etc. and wrap
122+
// them as retryable infra.
111123
return fmt.Errorf("get %s: %w", id, err)
112124
}
113125
```
@@ -119,7 +131,7 @@ Two practical rules fall out of the short-circuit semantics:
119131

120132
## Extensions Return Plain Go Errors
121133

122-
Extension interfaces (`MergeChecker`, `Storage`, `Publisher`) return standard `error` values. They may define their own domain-specific sentinel errors (e.g. `storage.ErrNotFound`, `storage.ErrVersionMismatch`) but they do **not** classify errors as user or infra — that is the controller's (and `Classify`'s) job.
134+
Extension interfaces (`MergeChecker`, `Storage`, `Publisher`) return standard `error` values. They may define their own domain-specific sentinel errors (e.g. `storage.ErrNotFound`, `storage.ErrVersionMismatch`) but they do **not** classify errors as user or infra — that is the controller's (and the consumer's `ErrorProcessor`'s) job.
123135

124136
This separation keeps extensions reusable across contexts. The same `storage.ErrNotFound` might be a user error in one controller (user requested a non-existent resource) and an infra error in another (expected record is missing).
125137

@@ -151,4 +163,4 @@ errors.Is(err, ErrNotFound) // true — cause is in the chain
151163
| `IsRetryable(err)` | `err` is or wraps an infra error with the retryable flag set |
152164
| `IsDependencyError(err)` | `err` is or wraps an infra error marked as dependency |
153165

154-
All three are type-only checks. They do not invoke classifiers — pair them with a preceding `Classify` call when the controller's error may not carry an explicit wrap.
166+
All three are type-only checks. They do not invoke classifiers — pair them with a preceding `ErrorProcessor.Process` call when the controller's error may not carry an explicit wrap.

core/errs/errs.go

Lines changed: 2 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ const (
143143
//
144144
// Classifiers must not call errors.As / errors.Is themselves, which would walk
145145
// the chain and could shadow a classification carried by an outer node (such
146-
// as a controller's explicit NewUserError wrap). The package-level Classify
147-
// function owns the walk.
146+
// as a controller's explicit NewUserError wrap). The classifier-based
147+
// ErrorProcessor (see NewClassifierProcessor) owns the walk.
148148
//
149149
// Classifiers are typically stateless; the canonical convention is to expose a
150150
// package-level singleton value (e.g. mysqlerrs.Classifier) rather than a
@@ -153,80 +153,6 @@ type Classifier interface {
153153
Classify(err error) Verdict
154154
}
155155

156-
// Classify is the single, explicit classification pass. It is intended to be
157-
// called exactly once per error chain — typically by the consumer immediately
158-
// after a controller returns — and produces a chain that subsequent IsUserError
159-
// / IsRetryable / IsDependencyError calls can interpret with simple type
160-
// checks (no further classifier walks).
161-
//
162-
// Semantics:
163-
//
164-
// - nil in, nil out.
165-
// - If err's chain already carries a framework classification (*userError or
166-
// *infraError anywhere in the chain), returns err unchanged — the chain is
167-
// already interpretable by IsUserError / IsRetryable / IsDependencyError.
168-
// - Otherwise, walks the chain from outermost to innermost, asking each
169-
// classifier per node. The FIRST non-Unknown verdict wins; the outermost
170-
// such node determines the wrap. err is wrapped with the framework
171-
// constructor matching that verdict (User -> NewUserError, InfraRetryable
172-
// -> NewRetryableError, etc.) and the wrapped error is returned.
173-
// - Verdict Infra means "non-retryable infra" — which is already the default
174-
// behavior for an unwrapped chain, so no wrap is added.
175-
// - If no classifier recognises anything, err is returned unchanged.
176-
//
177-
// Implementation: two passes over the chain. Pass 1 is a cheap type check
178-
// looking for an existing framework wrap and short-circuits if one is found —
179-
// no classifier is invoked. Pass 2 runs the configured classifiers per node.
180-
// Walking the chain is cheap relative to a classifier call, so this avoids
181-
// running classifiers whenever the chain is already classified deeper down.
182-
//
183-
// NOTE: this central classifier model cannot disambiguate errors of the same
184-
// underlying type produced by different extensions (e.g. a net.OpError from a
185-
// mysql connection vs the same type from an HTTP caller would both match the
186-
// mysql classifier here). Resolving that requires per-extension provenance
187-
// tagging; intentionally deferred.
188-
func Classify(err error, classifiers ...Classifier) error {
189-
if err == nil {
190-
return nil
191-
}
192-
193-
// Pass 1 — cheap framework-wrap check. If any node already carries a
194-
// framework type, the chain is interpretable as-is and classifiers are
195-
// never invoked.
196-
for cur := err; cur != nil; cur = errors.Unwrap(cur) {
197-
switch cur.(type) {
198-
case *userError, *infraError:
199-
return err
200-
}
201-
}
202-
203-
// Pass 2 — run classifiers per node from outermost to innermost. Stop at
204-
// the first non-Unknown verdict.
205-
var verdict Verdict
206-
for cur := err; cur != nil && verdict == Unknown; cur = errors.Unwrap(cur) {
207-
for _, c := range classifiers {
208-
if v := c.Classify(cur); v != Unknown {
209-
verdict = v
210-
break
211-
}
212-
}
213-
}
214-
215-
switch verdict {
216-
case User:
217-
return NewUserError(err)
218-
case InfraRetryable:
219-
return NewRetryableError(err)
220-
case InfraDependency:
221-
return NewDependencyError(err)
222-
case InfraDependencyRetryable:
223-
return NewRetryableDependencyError(err)
224-
}
225-
// Unknown or Infra — no wrap needed; the existing chain already behaves as
226-
// non-retryable infra at the IsRetryable / IsUserError layer.
227-
return err
228-
}
229-
230156
// IsUserError reports whether err is or wraps a user error, i.e. an error
231157
// produced by NewUserError. Inspects only the framework types in the chain.
232158
func IsUserError(err error) bool {

core/errs/generic/generic.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,19 @@ import (
2525

2626
// Classifier recognises generic, non-backend-specific errors and returns
2727
// errs.Unknown for anything it does not recognise so the surrounding
28-
// errs.Classify walker can keep looking down the unwrap chain.
28+
// classifier-processor walk can keep looking down the unwrap chain.
2929
//
3030
// The classifier is stateless; this package-level singleton is the canonical
31-
// handle. Pass it into consumer.New as a vararg.
31+
// handle. Pass it as one of the variadic classifiers to
32+
// errs.NewClassifierProcessor; the resulting processor is what gets handed to
33+
// consumer.New.
3234
var Classifier errs.Classifier = classifier{}
3335

3436
type classifier struct{}
3537

3638
// Classify inspects a single node. Per the errs.Classifier contract, this
37-
// must not call errors.Is / errors.As — errs.Classify owns the chain walk.
39+
// must not call errors.Is / errors.As — the classifier-processor owns the
40+
// chain walk.
3841
func (classifier) Classify(err error) errs.Verdict {
3942
// Cancellation signals that the caller aborted the work in flight
4043
// (process shutdown, deadline on the inbound RPC, parent operation gone) —
@@ -44,8 +47,8 @@ func (classifier) Classify(err error) errs.Verdict {
4447
// Cases where cancellation truly means "do not run this again" are
4548
// caller-specific and should be expressed by wrapping with an explicit
4649
// NewUserError / NewDependencyError before returning; the pass-1
47-
// framework-wrap check in errs.Classify will then short-circuit before
48-
// this classifier is consulted.
50+
// framework-wrap check in the classifier-processor will then short-circuit
51+
// before this classifier is consulted.
4952
if err == context.Canceled {
5053
return errs.InfraRetryable
5154
}

core/errs/generic/generic_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ func TestClassifier_Unknown(t *testing.T) {
3434
err error
3535
}{
3636
// Per-node contract — Classifier should NOT match a wrapped
37-
// context.Canceled; the surrounding errs.Classify walk will reach the
38-
// inner node and ask Classifier again there.
37+
// context.Canceled; the surrounding classifier-processor walk will
38+
// reach the inner node and ask Classifier again there.
3939
{"wrapped context.Canceled", fmt.Errorf("op: %w", context.Canceled)},
4040
{"deadline exceeded", context.DeadlineExceeded},
4141
{"plain error", errors.New("anything")},
@@ -49,17 +49,19 @@ func TestClassifier_Unknown(t *testing.T) {
4949
}
5050
}
5151

52-
func TestClassifier_AppliedViaClassify(t *testing.T) {
52+
func TestClassifier_AppliedViaProcessor(t *testing.T) {
53+
p := errs.NewClassifierProcessor(Classifier)
54+
5355
t.Run("bare context.Canceled becomes retryable infra", func(t *testing.T) {
54-
out := errs.Classify(context.Canceled, Classifier)
56+
out := p.Process(context.Canceled)
5557
assert.True(t, errs.IsRetryable(out))
5658
})
5759

5860
t.Run("wrapped context.Canceled becomes retryable infra", func(t *testing.T) {
5961
// The chain walker reaches the inner context.Canceled node and the
6062
// classifier matches there.
6163
wrapped := fmt.Errorf("process: %w", context.Canceled)
62-
out := errs.Classify(wrapped, Classifier)
64+
out := p.Process(wrapped)
6365
assert.True(t, errs.IsRetryable(out))
6466
})
6567

@@ -68,7 +70,7 @@ func TestClassifier_AppliedViaClassify(t *testing.T) {
6870
// The pass-1 framework-wrap check short-circuits before Classifier
6971
// runs.
7072
err := errs.NewUserError(context.Canceled)
71-
out := errs.Classify(err, Classifier)
73+
out := p.Process(err)
7274
assert.Same(t, err, out)
7375
assert.False(t, errs.IsRetryable(out))
7476
assert.True(t, errs.IsUserError(out))

0 commit comments

Comments
 (0)