diff --git a/Makefile b/Makefile
index 9b27578..2e59c63 100755
--- a/Makefile
+++ b/Makefile
@@ -3,6 +3,12 @@
test: fmt
go test -timeout=1s -short -race -covermode=atomic ./...
+test.db: test
+ go test -timeout=30s -race -covermode=atomic github.com/smarty/messaging/v3/handlers/harness/sqladapter
+
+test.db.local:
+ (docker compose -f doc/docker-compose.yml up --wait && $(MAKE) test.db --no-print-directory); docker compose -f doc/docker-compose.yml down
+
fmt:
go mod tidy && go fmt ./...
diff --git a/doc/docker-compose.yml b/doc/docker-compose.yml
new file mode 100644
index 0000000..7903147
--- /dev/null
+++ b/doc/docker-compose.yml
@@ -0,0 +1,13 @@
+services:
+ mysql:
+ image: mysql:8.0
+ environment:
+ MYSQL_ALLOW_EMPTY_PASSWORD: 1
+ MYSQL_DATABASE: messaging
+ ports:
+ - 3306:3306
+ healthcheck:
+ test: "mysql -uroot -e 'select 1'"
+ interval: 2s
+ timeout: 20s
+ retries: 10
diff --git a/sqlmq/_schema_mysql.sql b/doc/mysql/schema.sql
similarity index 100%
rename from sqlmq/_schema_mysql.sql
rename to doc/mysql/schema.sql
diff --git a/doc/work-sessions/2026/2026-05-07_13-06-13-proposal-import-harness.md b/doc/work-sessions/2026/2026-05-07_13-06-13-proposal-import-harness.md
new file mode 100644
index 0000000..2869fda
--- /dev/null
+++ b/doc/work-sessions/2026/2026-05-07_13-06-13-proposal-import-harness.md
@@ -0,0 +1,291 @@
+---
+name: Import harness + sqladapter into messaging/v3
+description: Proposal to lift the infra/harness pipeline into github.com/smarty/messaging/v3/handlers/harness and relocate the SQL-bound dispatcher/writer/recovery as a sqladapter subpackage.
+type: plot
+---
+
+# Proposal: Import `infra/harness` into `messaging/v3/handlers/harness`
+
+## Background
+
+A second project (working copy under `-context/domain-transformation-phase-9-chunk-C/code/infra`) has grown a staged, pipeline-based message-handling "harness" that we now want to promote into the shared `github.com/smarty/messaging/v3` module so it can be reused. The source split is:
+
+- `infra/harness/*` — a generic, store-and-forward pipeline built from goroutine stages connected by buffered channels. Stages: `Entrypoint → Execution → Serialization (fan-out) → Persistence → Completion → Broadcast → Terminal`. Supporting code: `fanout.go`, `pool.go`, `routing.go`, `scanner.go`.
+- `infra/*` — supporting types and a **reference implementation** of the `Writer` / `Dispatcher` interfaces plus a `Recover` function, all coupled to the same MySQL `Messages` table that `sqlmq/_schema_mysql.sql` already defines (`id`, `dispatched`, `type`, `payload`).
+
+The target module (`github.com/smarty/messaging/v3`) already has a `handlers/` namespace (`multi`, `retry`, `sqltx`, `transactional`) and reusable pipeline/streaming infrastructure (`streaming/`, `batch/`). The harness belongs there as another composable handler pattern.
+
+Decisions locked in during /plot and /replot:
+
+1. **Scope:** bring in both the generic harness and the SQL-backed reference implementation, but **invert the nesting**: harness at `handlers/harness`, SQL reference impl at `handlers/harness/sqladapter`.
+2. **Contracts:** reuse `messaging.Handler` and `messaging.Listener` from the root `contracts.go` instead of re-declaring them inside the harness package.
+3. **Serialization:** the pipeline holds an unexported `serializer` collaborator with a `Serialize(out io.Writer, in any) error` signature (close to `encoding/json/v2`'s `MarshalWrite`); the caller supplies one. The `goexperiment.jsonv2` build tag is dropped. **No serializer implementations ship with this module** — callers own encoding.
+4. **Wire-up:** `New(...)` uses the module's functional-options convention (`Options singleton` pattern — see `handlers/retry/config.go`, `batch/config.go`, `handlers/transactional/config.go`).
+5. **Test library:** bring in `github.com/smarty/gunit/v2` alongside the existing v1 dep. Both major versions coexist until a later, separate module-wide upgrade to v2.
+
+## Approach
+
+### 1. Destination layout
+
+```
+github.com/smarty/messaging/v3/
+├── contracts.go (unchanged; we'll reuse Handler, Listener, ListenCloser)
+├── handlers/
+│ ├── ...existing sub-packages...
+│ └── harness/ (NEW — generic pipeline; only New + Options are exported)
+│ ├── contracts.go (Writer, Dispatcher, Monitor — exported collaborator types; executor, applicator, serializer — unexported internals; event structs + error sentinels)
+│ ├── config.go (New(...) + Options singleton + option func + nop)
+│ ├── message.go (Message value type — lifted from infra/message.go)
+│ ├── pipeline.go (unexported build(ctx, cfg) wiring)
+│ ├── 00_entrypoint.go / _test.go (unexported entrypoint type + newEntrypoint)
+│ ├── 01_execution.go / _test.go (unexported execution + newExecution)
+│ ├── 02_serialization.go / _test.go (unexported serialization + newSerialization; no build tag)
+│ ├── 03_persistence.go / _test.go (unexported persistence + newPersistence)
+│ ├── 04_completion.go / _test.go (unexported completion + newCompletion)
+│ ├── 05_broadcast.go / _test.go (unexported broadcast + newBroadcast)
+│ ├── 06_terminal.go / _test.go (unexported terminal + newTerminal)
+│ ├── fanout.go (unexported fanIn + newFanIn + newFanOut)
+│ ├── pool.go
+│ ├── routing.go / routing_test.go (unexported router + newRouter)
+│ ├── scanner.go
+│ └── pipeline_test.go
+│ └── sqladapter/ (NEW — SQL reference impl)
+│ ├── dispatcher.go / _test.go
+│ ├── writer.go / _test.go
+│ ├── recovery.go / _test.go
+│ └── contracts.go (Logger interface)
+```
+
+### 2. Contract changes
+
+The current `infra/harness/contracts.go` declares:
+
+```go
+type Handler interface{ Handle(context.Context, ...any) }
+type Listener interface{ Listen() }
+```
+
+These are structurally identical to the ones in `messaging/contracts.go`, so we delete the local versions and have `harness.New(...)` return `messaging.Handler` + `[]messaging.Listener`. The `entrypoint`'s `Close() error` + `Listen()` also already satisfies `messaging.ListenCloser`.
+
+The package's collaborator interfaces split into two groups:
+
+- **Exported** (caller supplies a real implementation via `Options.*`): `Writer`, `Dispatcher`, `Monitor`. These are the external boundaries of the pipeline.
+- **Unexported** (internal to the package): `executor`, `applicator`, `serializer`, plus every pipeline-stage struct (`entrypoint`, `execution`, `serialization`, `persistence`, `completion`, `broadcast`, `terminal`), the `router`/`newRouter`, and the `fanIn`. The only way into the package is `New(...)` + `Options.*`. `Executor` and `Applicator` get unexported because callers never instantiate them directly — the pipeline discovers them reflectively via `router` from the domain types supplied through `Options.Types(...)`.
+
+The pluggable encoder lives as an unexported interface:
+
+```go
+// serializer encodes a single message's value onto the supplied writer. The
+// signature intentionally mirrors encoding/json/v2's MarshalWrite. Implementations
+// must be safe for concurrent use — the pipeline runs multiple serialization workers.
+type serializer interface {
+ Serialize(out io.Writer, in any) error
+}
+```
+
+The `serialization` stage stops importing any encoder directly; its constructor takes a `serializer`:
+
+```go
+func newSerialization(monitor Monitor, enc serializer, input, output chan *unitOfWork) *serialization
+```
+
+**No serializer implementations ship with this module.** Callers provide their own (for example, a thin wrapper around `encoding/json/v2`'s `MarshalWrite`, `encoding/json`'s `NewEncoder(w).Encode(v)`, protobuf, etc.). Keeping the type unexported forces callers to supply the collaborator through the functional option (see §8) rather than constructing a stage directly.
+
+### 3. Events and monitor
+
+The `Monitor` interface and its event types (`BatchInFlight`, `BatchComplete`, `UnitOfWorkInFlight`, `UnitOfWorkComplete`, `SerializationError`, `PersistenceError`, `BroadcastError`) move from `infra/contracts.go` to `handlers/harness/contracts.go` — all exported so callers can type-switch in their `Monitor.Track(any)` implementation. The error sentinels (`ErrSerialization`, `ErrPersistence`, `ErrBroadcast` — renamed from `ErrJSONSerialization` since we're format-agnostic) move with them. `Logger` is no longer needed inside `harness` itself (only `sqladapter` uses it); it moves into `sqladapter/contracts.go`.
+
+### 4. `Message` type placement
+
+`infra.Message` becomes `harness.Message` at `handlers/harness/message.go`. Its fields (`ID`, `Value`, `Type`, `Content`, `ContentType`, `ContentEncoding`, `Stored`, `Dispatched`) stay intact. The SQL adapter references `harness.Message` rather than duplicating.
+
+### 5. SQL adapter (`handlers/harness/sqladapter`)
+
+Direct port of `infra/dispatcher.go`, `infra/writer.go`, `infra/recovery.go`, with these edits:
+
+- `package infra` → `package sqladapter`.
+- Import path `root/code/infra` (implied reference) drops; cross-package uses switch to `github.com/smarty/messaging/v3/handlers/harness`.
+- `messaging.Connector` / `messaging.Dispatch` references resolve against the **same** module now (no path change needed — they already import `github.com/smarty/messaging/v3`).
+- Package doc comment labels it as a reference implementation that targets the `Messages` table defined by `sqlmq/_schema_mysql.sql` in this same module (columns `id`, `dispatched`, `type`, `payload`). That schema coupling is not new — `sqlmq/dispatch_store.go` already writes to the same table. The adapter is simply a second reader/writer of the same schema.
+- Preserve the existing `TODO` comments (double-encoding note in `dispatcher.go`, pagination note in `recovery.go`) — those are known issues, not something this import should silently fix.
+- Keep `legacyWrite func(context.Context, *sql.Tx, ...any)` escape hatch on `Writer`, but document it as deprecated in the package comment.
+
+### 6. Test infrastructure
+
+The source tests use **gunit v2** (`github.com/smarty/gunit/v2`) along with `assert/better` + `assert/should`. The target module currently pins **gunit v1** (`github.com/smarty/gunit v1.6.0`).
+
+**Plan:** add `github.com/smarty/gunit/v2` to `go.mod` and port the tests with their v2 imports intact. The module will temporarily depend on both major versions. A later, separate initiative will migrate the rest of the module's tests from v1 to v2, at which point v1 can be dropped — that migration is out of scope for this import.
+
+If any imported test uses assertions that don't exist in v2 (or have different semantics), adapt as minimally as possible; do not rewrite assertions that already work.
+
+### 7. `go.mod` updates
+
+- Add `github.com/smarty/gunit/v2` (latest compatible version). Both v1 and v2 will be present until the future module-wide migration.
+- `go.uber.org/goleak` is in source `go.sum` — confirm during import whether any ported test actually uses it; add only if needed.
+- No other new dependencies. `io` and (if caller demonstrations are ever added in docs) stdlib-only.
+
+### 8. Functional-options wire-up
+
+The existing `New(ctx, monitor, executor, writer, dispatcher)` signature is replaced with the module's functional-options pattern. **Only `ctx` is positional**; every collaborator — including the domain types that drive routing — flows through options. The source `Executor`/`Applicator` interfaces become unexported (`executor`, `applicator`); the reflective `router` discovers real implementations from the domain types supplied via `Options.Types(...)` and the pipeline never asks the caller for an `Executor` directly. Following the convention in `handlers/retry/config.go` and `batch/config.go`:
+
+```go
+// handlers/harness/config.go
+package harness
+
+import (
+ "context"
+ "io"
+
+ "github.com/smarty/messaging/v3"
+)
+
+func New(ctx context.Context, options ...option) (messaging.Handler, []messaging.Listener) {
+ var cfg configuration
+ for _, apply := range Options.defaults(options...) {
+ apply(&cfg)
+ }
+ return build(ctx, cfg) // internal wiring lives in pipeline.go
+}
+
+var Options singleton
+
+type singleton struct{}
+type option func(*configuration)
+
+type configuration struct {
+ Monitor Monitor
+ Serializer serializer
+ Writer Writer
+ Dispatcher Dispatcher
+ Types []any // domain types (handlers/applicators); passed to newRouter
+ BatchCapacity int // channel buffer, default 1024
+ UnitSize int // max completions per unit, default 64
+ SerializerCount int // fan-out worker count, default 4
+}
+
+// Types registers the domain objects whose Execute.../Apply... methods drive
+// the pipeline. They are passed verbatim to newRouter(...) at build time.
+func (singleton) Types(value ...any) option { return func(c *configuration) { c.Types = value } }
+func (singleton) Monitor(value Monitor) option { return func(c *configuration) { c.Monitor = value } }
+func (singleton) Serializer(value serializer) option { return func(c *configuration) { c.Serializer = value } }
+func (singleton) Writer(value Writer) option { return func(c *configuration) { c.Writer = value } }
+func (singleton) Dispatcher(value Dispatcher) option { return func(c *configuration) { c.Dispatcher = value } }
+func (singleton) BatchCapacity(value int) option { return func(c *configuration) { c.BatchCapacity = value } }
+func (singleton) UnitSize(value int) option { return func(c *configuration) { c.UnitSize = value } }
+func (singleton) SerializerCount(value int) option { return func(c *configuration) { c.SerializerCount = value } }
+
+func (singleton) defaults(options ...option) []option {
+ var blank = nop{}
+ return append([]option{
+ Options.Monitor(blank),
+ Options.Serializer(blank),
+ Options.Writer(blank),
+ Options.Dispatcher(blank),
+ Options.BatchCapacity(1024),
+ Options.UnitSize(64),
+ Options.SerializerCount(4),
+ }, options...)
+}
+
+// nop satisfies every collaborator interface so New(...) can be called with
+// zero options and still produce a runnable (if inert) pipeline. Callers
+// override whichever collaborators they care about via Options.*.
+type nop struct{}
+
+func (nop) Track(any) {}
+func (nop) Serialize(io.Writer, any) error { return nil }
+func (nop) Write(context.Context, ...any) error { return nil }
+func (nop) Dispatch(context.Context, ...any) error { return nil }
+func (nop) Execute(any, func(...any)) {}
+```
+
+Design notes:
+
+- `ctx` stays positional because the pipeline's lifetime is tied to it (see Trade-offs); everything else is an option.
+- `Options.Types(...)` is the single entry point for registering domain behavior. `build(ctx, cfg)` constructs the internal router via `newRouter(cfg.Types...)` and passes it into the `execution` stage as its `executor` collaborator. Callers never hold a reference to the router.
+- `Monitor`, `Serializer`, `Writer`, and `Dispatcher` all default to a shared `nop{}` implementation. A caller can invoke `New(ctx)` with zero options and get a runnable, inert pipeline; real deployments override whichever collaborators they care about via `Options.*`. This matches the defaulting style in `handlers/retry` and `handlers/transactional` (where `Logger` and `Monitor` default to nop).
+- Buffer/worker tunables, previously hard-coded, are now options with the existing values as defaults.
+- `build(ctx, cfg)` internally performs the current `New(...)`'s channel wiring; it lives in `pipeline.go` and stays unexported.
+- Every pipeline-stage struct and its constructor is unexported (`entrypoint`/`newEntrypoint`, `execution`/`newExecution`, `serialization`/`newSerialization`, `persistence`/`newPersistence`, `completion`/`newCompletion`, `broadcast`/`newBroadcast`, `terminal`/`newTerminal`, `fanIn`/`newFanIn`/`newFanOut`, `router`/`newRouter`). The only way into the package is `New(...)` + `Options.*`.
+
+### 9. Alternatives considered
+
+- **Flat layout, no sqladapter split.** Rejected: conflates a generic pipeline with a schema-coupled reference impl; callers targeting a different schema would be forced to copy-paste rather than compose.
+- **Top-level `/harness` package (sibling to `handlers/`).** Rejected: the user prefers nesting under `handlers/` where similar composable handler patterns live.
+- **Absorb `Message`, `Monitor`, events into the root `messaging` package.** Rejected: these are harness-specific concepts (the root `Delivery`/`Dispatch` types are the message abstraction for the rest of the module); promoting them would blur the boundary.
+- **Keep `jsonv2` build tag.** Rejected in favor of pluggable serialization. A caller who wants jsonv2 writes a short adapter; the pipeline stays encoder-agnostic.
+- **Ship a JSON serializer helper subpackage.** Rejected per /replot: this module deliberately provides no serializer implementations. Callers own encoding.
+- **Positional-argument constructor.** Rejected in favor of functional options for consistency with `retry`, `batch`, and `transactional`.
+- **Expose stage types / `Executor` / `Router`.** Rejected: callers have no reason to construct a single stage or wire their own router. Keeping everything except `New`, `Options`, `Message`, the collaborator interfaces, and the event/error types unexported narrows the supported surface and lets us refactor internals without breaking users.
+- **Positional `executor` argument.** Rejected: the source's `Executor` interface is really just "anything whose `Execute...(msg, broadcast)` methods the reflective router can see." Exposing a positional `Executor` parameter implied that callers build their own; `Options.Types(...)` more honestly describes the actual input (a list of domain objects) and lets the router stay private.
+
+## Trade-offs & Risks
+
+- **Context handling.** `Persistence` and `Broadcast` both capture the `ctx` supplied to `New(...)` and use it for every downstream Write/Dispatch call — so the pipeline's lifetime is tied to a single root context. This matches the source but is worth confirming explicitly; if the v3 conventions expect per-handle contexts to flow through, the stage signatures need adjustment. **Open question** for the user — proposal currently preserves source behavior.
+- **Retry-forever semantics.** Persistence and Broadcast retry indefinitely with a 1-second sleep (and TODOs for exponential backoff). Imported as-is. `handlers/retry` already exists in the target module and uses more sophisticated logic; over time we may want to delegate to it, but not in this import.
+- **Channel capacity defaults.** Back-pressure into the entrypoint's `Handle` (which holds a read lock while sending) can block callers under heavy load. Now tunable via `Options.BatchCapacity`, but the default of 1024 should be documented with this characteristic.
+- **`reflect`-based routing.** The internal router's `Execute` mutates a shared `exclusions` slice without locking — fine in the current pipeline (routing runs inside `execution` which a single goroutine calls per message). Because the router is now unexported and constructed once per `New(...)` call, the sharing concern goes away. Add a doc comment on the internal type anyway so future contributors don't regress this invariant.
+- **Dual gunit majors.** Temporarily carrying both `github.com/smarty/gunit` and `github.com/smarty/gunit/v2` increases `go.sum` noise and means new tests in *this* import use v2 while older tests elsewhere in the module still use v1. The divergence is explicit and scoped; the follow-up module-wide upgrade resolves it.
+- **Silent-by-default collaborators.** Because `Serializer`, `Writer`, and `Dispatcher` all default to a `nop{}` that returns `nil`, forgetting to wire a real implementation produces a pipeline that silently drops work rather than surfacing an error. Likewise, forgetting `Options.Types(...)` produces a pipeline whose router has no registered handlers. Callers must remember to set these. Mitigate by documenting this in the package doc comment and the `New(...)` godoc, and by making the `Options.*` entries the obvious next step in any example.
+
+## Implementation Checklist
+
+### Phase 1: Scaffolding and contracts
+
+- [x] Create directory `handlers/harness/` and `handlers/harness/sqladapter/`.
+- [x] Add `github.com/smarty/gunit/v2` to `go.mod`; run `go mod tidy`. (Temporarily removed by tidy with no importers; will return automatically once Phase 2 tests are ported.)
+- [x] Write `handlers/harness/contracts.go` with the **exported** surface (`Writer`, `Dispatcher`, `Monitor`, event structs `BatchInFlight`, `BatchComplete`, `UnitOfWorkInFlight`, `UnitOfWorkComplete`, `SerializationError`, `PersistenceError`, `BroadcastError`, sentinels `ErrSerialization`, `ErrPersistence`, `ErrBroadcast`) plus the **unexported** internal interfaces (`executor`, `applicator`, `serializer`) and value types (`batch`, `unitOfWork`).
+- [x] Write `handlers/harness/message.go` — copy `Message` struct with doc comments intact.
+- [x] Write `handlers/harness/pool.go` — copy verbatim (no external deps).
+- [x] Write `handlers/harness/scanner.go` — copy verbatim (signatures adjusted to use unexported `executor`/`applicator`).
+- [x] Write `handlers/harness/fanout.go` — `fanIn` / `newFanIn` / `newFanOut` stay unexported; `stationFactory` now returns `messaging.Listener` and uses unexported `unitOfWork`.
+- [x] Run `make compile` — confirm the package compiles (no tests yet).
+
+### Phase 2: Port stages bottom-up, TDD each one
+
+Work stage-by-stage from the terminal stage (simplest) upward, since downstream stages have no dependencies on upstream stages. All tests use `gunit/v2` imports as-is from source. **All stage types and constructors are renamed to unexported forms** (`Terminal` → `terminal`, `NewTerminal` → `newTerminal`, etc.); since the tests live in the same package, `_test.go` files can see them.
+
+- [x] Copy `06_terminal_test.go` from source; rename referenced types to lowercase. Run the terminal test — expect **failure** (`terminal` type doesn't exist yet in this package).
+- [x] Port `06_terminal.go` as `terminal` / `newTerminal`. Run tests; confirm passing.
+- [x] Copy `04_completion_test.go`; lowercase the type references. Run — expect failure.
+- [x] Port `04_completion.go` as `completion` / `newCompletion`. Run tests; confirm passing.
+- [x] Copy `05_broadcast_test.go`; lowercase the type references. Run — expect failure.
+- [x] Port `05_broadcast.go` as `broadcast` / `newBroadcast`. Run; confirm passing.
+- [x] Copy `03_persistence_test.go`; lowercase the type references. Run — expect failure.
+- [x] Port `03_persistence.go` as `persistence` / `newPersistence`. Run; confirm passing.
+- [x] **Rewrite** `02_serialization_test.go` to use a fake `serializer` (not jsonv2). Test should cover: success path writes to `message.Content`; serializer error is reported via monitor as `SerializationError` with `ErrSerialization` wrapped. Run — expect failure.
+- [x] Port `02_serialization.go` as `serialization` / `newSerialization` with the new `Serialize(io.Writer, any) error` signature; drop the `//go:build goexperiment.jsonv2` tag. Run; confirm passing.
+- [x] Copy `01_execution_test.go`; lowercase the type references. Run — expect failure.
+- [x] Port `01_execution.go` as `execution` / `newExecution`. Run; confirm passing.
+- [x] Copy `00_entrypoint_test.go`; lowercase the type references. Confirm it exercises `Close` + `Listen` semantics. Run — expect failure.
+- [x] Port `00_entrypoint.go` as `entrypoint` / `newEntrypoint`. Run; confirm passing.
+
+### Phase 3: Routing, pipeline, and functional-options config
+
+- [x] Copy `routing_test.go`; lowercase references (`Router` → `router`, `NewRouter` → `newRouter`). Run — expect failure.
+- [x] Port `routing.go` as `router` / `newRouter` with unexported `executor` / `applicator` interfaces; update `scanner.go`'s signature to match. Run; confirm passing.
+- [x] Write `handlers/harness/pipeline.go` as the unexported `build(ctx, cfg)` function; it constructs all the channels, calls `newRouter(cfg.Types...)`, wires every stage, and returns `messaging.Handler` + `[]messaging.Listener`.
+- [x] Write `handlers/harness/config.go` with `New(ctx, options...)` (no positional executor), `Options singleton`, `option` type, `configuration` struct (including `Types []any`), and per-option setters per §8, including `Options.Types(...)`.
+- [x] Write a `config_test.go` that asserts defaults (`BatchCapacity=1024`, `UnitSize=64`, `SerializerCount=4`, and that `Monitor`, `Serializer`, `Writer`, and `Dispatcher` all default to the shared `nop{}` and behave inertly when the pipeline runs with no options supplied). Also assert that `Options.Types(...)` populates `configuration.Types` verbatim.
+- [x] Adapt the source `pipeline_test.go` to call the new functional-options `New(...)`: the fixture registers itself via `Options.Types(this)` (it implements the `Execute...` method), and supplies fake `Writer`/`Dispatcher`/`serializer`/`Monitor` via `Options.*`. Run — expect failure if anything is still misaligned, then make green.
+- [x] Run the full harness test suite with `-race`; confirm no goroutine leaks and all tests pass.
+
+### Phase 4: SQL adapter
+
+- [x] Copy `infra/dispatcher_test.go` to `handlers/harness/sqladapter/dispatcher_test.go`. Rewrite imports (`package infra` → `sqladapter`, `infra.Message` → `harness.Message`; gunit imports stay v2). Also added `testdb_test.go` with local `openTestDatabase`/`ensureDatabaseReadiness` helpers (pointing at `sqlmq/_schema_mysql.sql`) since the source's `db-connector` dep is out of scope for this module. Run — expect failure.
+- [x] Port `dispatcher.go` to `sqladapter/dispatcher.go`. Add package-level doc comment labeling it as a reference implementation targeting the `Messages` table defined by `sqlmq/_schema_mysql.sql`. Preserve existing TODOs. Run; confirm passing.
+- [x] Copy `writer_test.go` → `sqladapter/writer_test.go`, rewrite imports. Replaced the external `billing` registry types and `openTestDatabase` dep with local test-only `orderReceived`/`orderApproved` structs and shared helpers from `testdb_test.go`. Run — expect failure.
+- [x] Port `writer.go`. Preserve `legacyWrite` escape hatch with a deprecation note in the godoc. Run; confirm passing.
+- [x] Copy `recovery_test.go` → `sqladapter/recovery_test.go`, rewrite imports. Run — expect failure.
+- [x] Port `recovery.go`. Preserve existing TODOs about pagination and Listener conversion. Run; confirm passing.
+- [x] Add `sqladapter/contracts.go` with the `Logger` interface (moved from `infra/contracts.go`). (Added earlier alongside dispatcher.go so it would compile.)
+
+### Phase 5: Module hygiene
+
+- [x] Run `make test` — full module test suite with `-race -covermode=atomic`. Confirm green. (All packages pass; harness 99.5%, sqladapter 0% under `-short` since its integration tests require a live MySQL — they pass end-to-end against a local DB when run without `-short`.)
+- [x] Run `go mod tidy`. Confirms two new direct deps: `github.com/smarty/gunit/v2` (planned) and `github.com/go-sql-driver/mysql` (test-only driver added during Phase 4 when we chose not to bring in `db-connector` — imported via `testdb_test.go`'s `_` alias, so it participates only in test builds). One new indirect: `filippo.io/edwards25519` (transitively required by the MySQL driver).
+- [x] Inspect `go.sum` diff — only `filippo.io/edwards25519` and gunit/v2 additions, both explained by the items above.
+- [x] Grep the new packages for references to `root/code/infra` — confirm zero.
+- [x] Grep the new packages for any `goexperiment.jsonv2` build tags — confirm zero.
+- [x] Add short package-level doc comments (`// Package harness provides a staged, store-and-forward message-handling pipeline...` etc.) on `harness` and `sqladapter`. (`harness` on config.go, `sqladapter` on dispatcher.go.)
+- [x] Self-review diff for any stray `package infra` / wrong package declarations, unused imports, and leftover `TODO: pool ...` comments that should stay vs. be addressed now (keep them — they're load-bearing signals for future work).
\ No newline at end of file
diff --git a/doc/work-sessions/2026/2026-05-14_pipeline-component-diagram.svg b/doc/work-sessions/2026/2026-05-14_pipeline-component-diagram.svg
new file mode 100644
index 0000000..0f4ff08
--- /dev/null
+++ b/doc/work-sessions/2026/2026-05-14_pipeline-component-diagram.svg
@@ -0,0 +1,236 @@
+
+
diff --git a/doc/work-sessions/2026/2026-05-28_15-30-32-proposal-harness-resilience-module-changes.md b/doc/work-sessions/2026/2026-05-28_15-30-32-proposal-harness-resilience-module-changes.md
new file mode 100644
index 0000000..3a41fbf
--- /dev/null
+++ b/doc/work-sessions/2026/2026-05-28_15-30-32-proposal-harness-resilience-module-changes.md
@@ -0,0 +1,669 @@
+---
+name: Harness pipeline resilience (module-local changes)
+description: Implement the messaging/v3 harness side of the cross-repo "harness resilience and idempotency" proposal — a void context-honoring HTTP entrypoint (unexported await) alongside today's Handle, a pre-flight admission gate (unexported admit) plus an in-module net/http shedding middleware, a thin AsHTTPHandler decorator so HTTP shells need no changes, and split channel-buffer sizing (BatchCapacity vs UnitCapacity). Excludes per-service route wireup and post-deploy observation, which belong to the consuming repos.
+type: plot
+---
+
+# Proposal: Harness Pipeline Resilience — Module-Local Changes
+
+## Background
+
+The cross-repo proposal at
+`billing-context/.../2026-05-15_23-02-32-proposal-harness-resilience-and-idempotency.md`
+describes three operational changes layered on top of Chunk C of the incremental
+domain transformation. The bulk of those changes live entirely inside this
+module (`github.com/smarty/messaging/v3`); the only per-consumer work that
+remains is the mechanical wrapping of each mutating route, which lives in the
+~10 `*-context` services and is out of scope here.
+
+This proposal scopes the work to just the parts that land in *this* repo:
+
+- All edits under `handlers/harness/` (entrypoint, contracts, config, pipeline,
+ fanout, the new admission middleware + decorator, and their tests).
+- A documentation update to `doc/work-sessions/2026/2026-05-14_pipeline-component-diagram.svg`
+ to reflect the split capacity knobs, the admission gate, and the new monitor
+ observations.
+
+The per-service route wireup (calling `AsHTTPHandler` and `Admission` from each
+service's routes table), integration tests, and post-deploy observation steps
+from the parent proposal happen in each consuming repo *after* this repo's
+changes merge and a tagged version of `messaging/v3` is published. They are
+explicitly **out of scope** for this proposal.
+
+### Why the changes are needed (recap)
+
+Two operational concerns surfaced after Chunks A and B of the incremental
+domain transformation merged:
+
+1. **HTTP requests stack up indefinitely during a database/RabbitMQ outage.**
+ `entrypoint.Handle` blocks on a per-call `sync.WaitGroup` until the
+ Completion stage fires; the caller's `context.Context` is captured into
+ `*batch` but never observed. Even when the HTTP client's deadline passes or
+ the load balancer cancels the request, the goroutine stays parked.
+2. **The pipeline's six channels are all sized to one knob (`BatchCapacity`,
+ default 1024).** During an outage that lets tens of thousands of in-memory
+ domain mutations sit between Apply and durable storage. The in-memory
+ `Domain` drifts far ahead of what was ever stored, surfacing as "we said yes
+ to the client, then forgot" on restart.
+
+### Why a *void* HTTP entrypoint (the central design constraint)
+
+An earlier draft gave the HTTP entrypoint the signature
+`HandleResult(ctx, message any) HandleOutcome`, returning an enum the caller
+mapped to `503`/`504`. That return value would force **every** mutating HTTP
+route to branch on the outcome — and to add the corresponding test cases to
+maintain coverage. Across the ~10 `*-context` services there are **70+
+mutating routes**; replicating outcome-mapping logic (and its tests) at each
+one is exactly the kind of accumulated, duplicated branching we want to avoid.
+
+Two observations dissolve the need for a return value:
+
+- **Shedding is a *pre-flight* decision, not a *result*.** It can be decided
+ before the route handler ever runs. A small `admit() bool` predicate plus a
+ single HTTP middleware (written once, in this module) writes the `503` and
+ short-circuits the route. The route handler — and its tests — never see it.
+- **A departed caller does not need a status override.** When the caller's
+ `ctx` fires mid-flight, the goroutine simply unblocks (fixing the leak in
+ concern #1) and the route writes its normal response to a client that is
+ already gone. No `504`, no buffering of the response, no extra machinery. The
+ in-flight batch keeps processing and durably stores regardless.
+
+That leaves the HTTP entrypoint **void**, identical in spirit to today's
+`Handle`. The harness-side fix has these independently-mergeable pieces:
+
+1. **A void, context-honoring HTTP entrypoint** — an unexported
+ `await(ctx, message any)` alongside the existing
+ `Handle(ctx, messages ...any)`. `await` honors `ctx.Done()`, processes
+ exactly one message, emits a `CallerDeparted` observation when the caller
+ leaves, and returns nothing.
+2. **Pre-flight admission** — an unexported `admit() bool` predicate on the
+ entrypoint (high-watermark check against the `batches` channel, used only by
+ the in-package middleware) and an in-module `Admission` HTTP middleware that
+ writes a `503` (inline, raw `net/http`) when the gate refuses. A thin
+ `AsHTTPHandler` decorator adapts `await` to the `messaging.Handler` interface
+ the HTTP shells already depend on, so neither the shells nor their tests
+ change.
+3. **Split channel-buffer sizing** — `BatchCapacity` continues to size the
+ caller-side `batches` channel; a new `UnitCapacity` (default 1) sizes
+ `work1`–`work5` and the per-worker fan-out outputs.
+
+The companion `AdjustOrder` domain idempotency change (separate proposal,
+already merged in the consuming repos) makes it safe for `await` to return
+early when the caller's `ctx` fires: the in-flight batch keeps processing and
+durably stores, and a client retry collapses to a no-op once the original batch
+has persisted.
+
+## Approach
+
+### Decision summary
+
+Three methods on `*entrypoint` (two of them unexported), plus two thin
+module-local adapters that are the *only* new exported surface:
+
+| Method | Visibility | Caller | Honors `ctx.Done()` | Sheds | Return | Arity |
+|-------------------------------------|------------|----------------------|---------------------|---------|--------|-------------|
+| `Handle(ctx, messages ...any)` | exported | MQ, cron | No | No | none | variadic |
+| `await(ctx, message any)` | in-package | HTTP (via adapter) | Yes | No | none | exactly one |
+| `admit() bool` | in-package | HTTP middleware | n/a | Decides | `bool` | n/a |
+
+| Adapter (this module, exported) | Role |
+|------------------------------------------------------------|--------------------------------------------------------------------------------------------|
+| `AsHTTPHandler(messaging.Handler) messaging.Handler` | Wraps the handler's `await` so HTTP shells keep depending on `messaging.Handler` (zero shell change) |
+| `Admission(messaging.Handler, http.Handler) http.Handler` | Pre-flight gate; writes inline `503` when `admit()` is false |
+
+`await`, `admit`, and the `awaiter`/`admitter` interfaces the adapters assert
+against are all **unexported**. Because both the middleware and the decorator
+live in this package, the consumer never names them — `AsHTTPHandler(handler)`
+and `Admission(handler, shell)` are the entire integration vocabulary, and both
+take/return the standard `messaging.Handler`/`http.Handler` types.
+
+`await` takes a single `message any` (not variadic) because every HTTP route in
+every consuming service invokes the domain with exactly one command per
+request. Constraining the signature here:
+
+- Eliminates the empty-slice / multi-message edge cases on the HTTP path.
+- Tightens the worst-case in-memory work-in-progress bound: combined with
+ `UnitCapacity=1`, each in-flight HTTP request contributes exactly one input
+ message to a batch. The `batches` channel capacity now corresponds directly
+ to a count of HTTP commands enqueued rather than a count of caller
+ invocations of arbitrary size.
+- Surfaces the asymmetry plainly — MQ/cron may legitimately deliver multiple
+ events per call (broker batch deliveries); HTTP does not.
+
+Two new monitor observations:
+
+- `LoadShed{}` — emitted by `admit()` when it refuses on the high-watermark
+ check. (Refusal because the pipeline is `closed` is shutdown, not load, and
+ emits nothing.)
+- `CallerDeparted{}` — emitted by `await` when the caller's `ctx` fired before
+ completion (whether during enqueue or during the wait).
+
+Two new configuration options with defaults:
+
+- `Options.UnitCapacity(int)` — default `1`. Sizes `work1`–`work5` and the
+ per-worker fan-out outputs.
+- `Options.ShedThreshold(float64)` — default `0.80`. Fraction of the `batches`
+ channel capacity at or past which `admit()` refuses.
+
+### Detailed design
+
+#### 1. Shared helpers (pure refactor of today's `Handle`)
+
+Today's `00_entrypoint.go:29` has a single `Handle` whose body inlines waiter
+acquisition, batch allocation, completion-callback wiring, the
+admission-under-RWMutex sequence, and the wait. The split extracts three
+private helpers shared by `Handle` and `await`. `prepare` keeps its variadic
+`...any` shape so `Handle` passes its argument through verbatim; `await` calls
+`prepare(ctx, message)` with its single message, which Go promotes to a
+one-element slice at the call site.
+
+```go
+// prepare acquires a waiter, allocates a *batch from the pool, and wires up
+// the completion callback. Returns the items the caller will need.
+func (this *entrypoint) prepare(ctx context.Context, messages ...any) (waiter *sync.WaitGroup, item *batch) {
+ waiter = this.waiters.Get()
+ waiter.Add(1)
+ item = this.batches.Get()
+ item.ctx = ctx
+ item.messages = messages
+ item.complete = func() {
+ waiter.Done()
+ this.monitor.Track(batchComplete)
+ this.batches.Put(item)
+ }
+ return waiter, item
+}
+
+// abandon releases waiter and pool entry when the item was never enqueued
+// (i.e. complete() will never fire).
+func (this *entrypoint) abandon(waiter *sync.WaitGroup, item *batch) {
+ waiter.Done()
+ this.batches.Put(item)
+}
+
+// waiterDone wraps waiter.Wait() in a chan struct{} so it's select-able.
+func (this *entrypoint) waiterDone(waiter *sync.WaitGroup) (done chan struct{}) {
+ done = make(chan struct{})
+ go func() { waiter.Wait(); close(done) }()
+ return done
+}
+```
+
+#### 2. Path A — `Handle` (MQ and cron): preserves today's contract verbatim
+
+```go
+func (this *entrypoint) Handle(ctx context.Context, messages ...any) {
+ waiter, item := this.prepare(ctx, messages...)
+ defer this.waiters.Put(waiter)
+
+ this.lock.RLock()
+ if this.closed {
+ this.lock.RUnlock()
+ this.abandon(waiter, item)
+ return
+ }
+ this.work <- item
+ this.monitor.Track(batchInFlight)
+ this.lock.RUnlock()
+
+ waiter.Wait()
+}
+```
+
+Properties:
+- **No `ctx.Done()` honoring.** MQ deliveries don't carry a client deadline;
+ cron has its own scheduler-level guard. Returning early would cause
+ `streaming` to ack work the pipeline never finished.
+- **No load-shed.** Sending to `this.work` is a blocking channel send.
+ Back-pressure propagates to the broker via prefetch limits and unacked
+ counts.
+- **The only "shed" condition is pipeline shutdown** (`this.closed`) — exactly
+ today's behavior.
+
+#### 3. Path B — `await` (HTTP): void, context-honoring, single message
+
+```go
+func (this *entrypoint) await(ctx context.Context, message any) {
+ waiter, item := this.prepare(ctx, message)
+ defer this.waiters.Put(waiter)
+
+ this.lock.RLock()
+ if this.closed {
+ this.lock.RUnlock()
+ this.abandon(waiter, item)
+ return
+ }
+ select {
+ case this.work <- item:
+ this.monitor.Track(batchInFlight)
+ this.lock.RUnlock()
+ case <-ctx.Done():
+ this.lock.RUnlock()
+ this.abandon(waiter, item)
+ this.monitor.Track(callerDeparted)
+ return
+ }
+
+ select {
+ case <-this.waiterDone(waiter):
+ case <-ctx.Done():
+ this.monitor.Track(callerDeparted)
+ }
+}
+```
+
+Properties:
+- **Void.** No outcome to return — shedding is handled pre-flight by the
+ middleware (see §4), and a departed caller just unblocks.
+- **Single message per call.**
+- **Honors `ctx.Done()` in both the enqueue and the wait.** This is what fixes
+ the indefinite-stacking concern: an HTTP request whose client deadline passes
+ (or whose load balancer cancels) unblocks promptly instead of parking.
+- **No hard-full backstop.** The ctx-honoring send already bounds the enqueue
+ wait, so there is no need for a non-blocking `default` arm — and dropping it
+ avoids a silent post-admit shed that would leave `command.Result` zero and
+ cause the shell to emit a wrong status (e.g. 404). `admit()` is the watermark
+ gate; the ctx-honoring send is the backstop.
+
+**Pool-entry lifecycle:**
+- Success / wait-departed path: the batch was enqueued, so the pipeline owns it
+ and will invoke `item.complete()` (which `Put`s it). `await` must **not**
+ `Put` — the pool would receive the same item twice. (On wait-departure the
+ pipeline still completes the batch; only the HTTP goroutine returns early.)
+- Enqueue-departed and closed paths: the item was never enqueued, so
+ `complete()` will never fire; `abandon(waiter, item)` does the cleanup.
+
+**Note — why `complete()` owns the `Put`, not the entrypoint.** A tempting
+simplification is to have `complete()` only release the waiter and let the
+entrypoint `Put` the batch once its own wait returns. That works for `Handle`
+(which always waits for completion) but is *incorrect* for `await`: on the
+wait-departed path `await` returns on `ctx.Done()` **before** completion, so an
+entrypoint-side `Put` would never run and the pooled `*batch` would leak (worse,
+the pipeline's later `complete()` would mutate an item the entrypoint believed
+it had reclaimed). Completion-owned `Put` is precisely what lets a single
+cleanup rule cover both "caller waited" and "caller departed but the pipeline
+finished later." The entrypoint only `Put`s — via `abandon` — on the paths
+where the batch was *never* enqueued and `complete()` will therefore never fire.
+So the current split isn't just cleaner; it's the only correct allocation of
+the `Put`.
+
+**Critically, the batch is not abandoned by the pipeline when the caller
+departs.** When `ctx` fires after enqueue, the in-flight batch keeps
+processing; `complete()` still fires; persistence still happens. Only the HTTP
+caller's goroutine returns early.
+
+#### 4. Pre-flight admission: `admit()` + `Admission` middleware + `AsHTTPHandler`
+
+The high-watermark check lives in a side-effect-light predicate on the
+entrypoint:
+
+```go
+func (this *entrypoint) admit() bool {
+ this.lock.RLock()
+ defer this.lock.RUnlock()
+ if this.closed {
+ return false
+ }
+ if float64(len(this.work))/float64(cap(this.work)) >= this.shedThreshold {
+ this.monitor.Track(loadShed)
+ return false
+ }
+ return true
+}
+```
+
+`Options.ShedThreshold(value float64)` exposes the threshold; default `0.80`.
+Setting it ≥ `1.0` disables high-watermark shedding (only the closed-pipeline
+refusal remains).
+
+The middleware and the decorator live in this module (new file
+`handlers/harness/admission.go`) and depend only on the standard library and
+`messaging/v3`. The `awaiter`/`admitter` interfaces are unexported; the adapters
+accept the standard `messaging.Handler` returned by `New(...)` and assert it to
+those interfaces internally (a failed assertion is a wireup-time programming
+error and panics fast). The `503` response is flushed inline with raw
+`net/http`:
+
+```go
+type (
+ admitter interface {
+ admit() bool
+ }
+ awaiter interface {
+ await(ctx context.Context, message any)
+ }
+)
+
+// Admission refuses overloaded requests before the wrapped handler runs,
+// writing an inline 503. Wrap each mutating route with it.
+func Admission(handler messaging.Handler, inner http.Handler) http.Handler {
+ gate := handler.(admitter)
+ return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) {
+ if gate.admit() {
+ inner.ServeHTTP(response, request)
+ return
+ }
+ response.Header().Set("Content-Type", "application/json; charset=utf-8")
+ response.Header().Set("Retry-After", "1")
+ response.WriteHeader(http.StatusServiceUnavailable)
+ _, _ = response.Write(shedResponseBody)
+ })
+}
+
+// AsHTTPHandler adapts the void, context-honoring await to the
+// messaging.Handler the HTTP shells already depend on, so no shell (and no
+// shell test) changes.
+func AsHTTPHandler(handler messaging.Handler) messaging.Handler {
+ return &httpAdapter{target: handler.(awaiter)}
+}
+
+type httpAdapter struct {
+ target awaiter
+}
+
+func (this *httpAdapter) Handle(ctx context.Context, messages ...any) {
+ for _, message := range messages {
+ this.target.await(ctx, message)
+ }
+}
+
+var shedResponseBody = []byte(`{"errors":[{"message":"service overloaded"}]}`)
+```
+
+**Why this keeps the route handlers (and their tests) untouched.** Each
+consuming service's HTTP shells already build a command, call
+`handler.Handle(ctx, command)` on a `messaging.Handler`, and map the *mutated
+command's* result field (`command.Result`) to its response — they read no
+return value from `Handle` today. Two seams preserve that exactly:
+
+- `AsHTTPHandler(handler)` is substituted for the raw handler when the write
+ shells are constructed, so `Handle` now routes through `await` (ctx-honoring,
+ single-message) without the shell knowing.
+- `Admission(handler, shell)` wraps each mutating route in the routes table, so
+ the `503` is decided before the shell runs.
+
+Illustrative consumer wireup (out of scope, shown for context only):
+
+```go
+handler, listeners := harness.New(ctx, opts...)
+httpHandler := harness.AsHTTPHandler(handler)
+// ...
+{"POST /admin/orders", harness.Admission(handler, NewAdminApproveOrderShell(httpHandler))},
+{"PUT /admin/accounts/:account/orders/:order/adjustments", harness.Admission(handler, NewAdminAdjustOrderShell(httpHandler))},
+```
+
+The consumer performs no type assertions of its own — it passes the
+`messaging.Handler` from `New(...)` straight into both adapters. Each of the 70+
+mutating routes across the ~10 services becomes a mechanical one-line wrap — no
+per-route outcome logic, no per-route tests. The `503`/departed behavior is
+tested **once**, here, against the middleware and the decorator.
+
+**Race note.** `admit()`'s `len(chan)/cap(chan)` snapshot races with concurrent
+producers/consumers, and there is a TOCTOU window between `admit()` returning
+true and `await`'s enqueue. Both are acceptable: the threshold is a soft signal,
+and the ctx-honoring send means that even if the channel fills in the race
+window, the request either drains normally or unblocks on `ctx.Done()` — it
+never produces a wrong status the way a silent post-admit shed would.
+
+#### 5. Split channel buffer sizing
+
+Today (`pipeline.go:11-18`):
+
+```go
+batches = make(chan *batch, config.BatchCapacity)
+work1 = make(chan *unitOfWork, config.BatchCapacity)
+// ... work2..work5 same
+```
+
+Proposed:
+
+```go
+batches = make(chan *batch, config.BatchCapacity)
+work1 = make(chan *unitOfWork, config.UnitCapacity)
+work2 = make(chan *unitOfWork, config.UnitCapacity)
+work3 = make(chan *unitOfWork, config.UnitCapacity)
+work4 = make(chan *unitOfWork, config.UnitCapacity)
+work5 = make(chan *unitOfWork, config.UnitCapacity)
+```
+
+And in `fanout.go:17`, the per-worker output channels (currently hardcoded to
+1024) become `make(chan *unitOfWork, unitCapacity)` where `unitCapacity` is
+threaded through `newFanOut`'s signature (lower-blast-radius option preferred
+at implementation time).
+
+`UnitCapacity` defaults to 1. Tunable via `Options.UnitCapacity(value int)`.
+Setting it equal to `BatchCapacity` reproduces today's behavior.
+
+**Why default 1, not 0?** Fully unbuffered channels turn every stage handoff
+into a synchronization barrier — stage N can't begin unit N+1 until stage N+1
+has received unit N. Buffer-1 lets stage N finish unit N+1 *while* stage N+1
+is processing unit N. Pipelining benefit saturates at depth ~1 since each
+stage runs in a single goroutine (except serialization, which has its own
+fan-out concurrency).
+
+**Bound on in-memory drift during an outage** — with `UnitCapacity=1` and 5
+channels post-domain, plus the in-flight unit at each stage, the worst case is
+~10 units' worth of unpersisted mutations. At `UnitSize=64` that's ~640
+batches' worth of broadcast results downstream of Execution.
+
+The single-message `await` signature *also* tightens the upstream side: each
+HTTP-admitted batch on the `batches` channel now carries exactly one input
+message, so `BatchCapacity` becomes a direct count of in-flight HTTP commands
+rather than a count of caller invocations of arbitrary fan-out.
+
+### Non-goals
+
+- **Rewriting the pipeline.** The structure (Entrypoint → Execution →
+ Serialization → Persistence → Completion → Broadcast → Terminal) is
+ preserved verbatim.
+- **Changing `messaging.Handler`.** `Handle(ctx, messages ...any)` keeps its
+ exact existing contract. `await` is a new (unexported) method, not a
+ replacement, and intentionally has a different signature (single message,
+ void).
+- **Returning a status outcome from the HTTP path.** Explicitly rejected — see
+ Background and Alternatives. Shedding is decided pre-flight; departed callers
+ just unblock.
+- **Buffering the HTTP response to override status post-hoc.** Not done — a
+ departed caller is already gone, so the shell's normal write to a dead
+ connection is harmless, and no `504` is emitted.
+- **Per-service route wireup.** Calling `AsHTTPHandler` / `Admission` from each
+ service's routes table happens in the consuming repos, after a tagged
+ release. Out of scope here.
+- **Domain-layer changes.** The companion `AdjustOrder` idempotency change is
+ in the consuming repos and is assumed merged before any consumer relies on a
+ departed-caller batch persisting. Nothing in this proposal touches domain
+ code.
+- **`harness/sqladapter` changes.** No code changes; just a regression check
+ via `go test`.
+
+### Files modified (this repo only)
+
+| Path | Action | Purpose |
+|--------------------------------------------------------------------|--------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `handlers/harness/00_entrypoint.go` | Modify | Extract `prepare`/`abandon`/`waiterDone`; keep `Handle` behavior identical; add unexported `await(ctx, message any)` and `admit() bool`; add `shedThreshold` field; add `loadShed`/`callerDeparted` sentinels |
+| `handlers/harness/admission.go` | Add | Unexported `admitter`/`awaiter` interfaces; `Admission(messaging.Handler, http.Handler) http.Handler` (inline `503`); `AsHTTPHandler(messaging.Handler) messaging.Handler` decorator; `shedResponseBody` |
+| `handlers/harness/contracts.go` | Modify | New `LoadShed` and `CallerDeparted` event types alongside the existing `BatchInFlight`/`BatchComplete`/etc. |
+| `handlers/harness/config.go` | Modify | `Options.UnitCapacity(int)`, `Options.ShedThreshold(float64)`; defaults 1, 0.80 |
+| `handlers/harness/pipeline.go` | Modify | Use `UnitCapacity` for `work1`–`work5`; pass it into `newFanOut`; thread `ShedThreshold` into `newEntrypoint` |
+| `handlers/harness/fanout.go` | Modify | Accept `unitCapacity` and use it for the per-worker output channels instead of the hardcoded 1024 |
+| `handlers/harness/00_entrypoint_test.go` | Modify | New tests for `await` (void, ctx-honoring, single message, pool lifecycle) and `admit` (watermark/closed/threshold), plus pinning tests for `Handle` |
+| `handlers/harness/admission_test.go` | Add | Tests for `Admission` (pass-through when admitted; inline `503` body/headers when refused) and `AsHTTPHandler` (forwards to `await`) |
+| `handlers/harness/config_test.go` | Modify | Assert defaults for `UnitCapacity`, `ShedThreshold`; assert override setters |
+| `handlers/harness/pipeline_test.go` | Modify | Adjust assertions if any depend on default channel sizes (none expected to break — defaults preserve external observable behavior) |
+| `doc/work-sessions/2026/2026-05-14_pipeline-component-diagram.svg` | Modify | Reflect split `BatchCapacity`/`UnitCapacity` knobs, the `admit` gate + `Admission` middleware, `LoadShed`/`CallerDeparted` observations, and the void `await` ingress |
+
+### Alternatives considered
+
+- **Return a `HandleOutcome` enum from the HTTP path and map it per-route.**
+ Rejected — this is the motivating problem. Across ~10 `*-context` services
+ and 70+ mutating routes, each route would replicate `503`/`504` branching and
+ the tests to cover it. Centralizing the decision in a pre-flight gate +
+ middleware keeps the route handlers void and tested once.
+- **Buffer the HTTP `ResponseWriter` in middleware to override status after the
+ shell runs (the "Option A" / `504` path).** Rejected — the consuming shells
+ flush the response *inside* the handler, so a post-hoc override requires
+ wrapping the writer in a buffering shim in this module. The only payoff would
+ be a precise `504`, but a departed caller's connection is already gone, so the
+ shell's normal write is harmless and no override is needed. More machinery, no
+ real benefit.
+- **Panic on shed + recover in middleware.** Rejected — shedding is a
+ *high-frequency* condition precisely during an outage, so this would panic on
+ a large fraction of requests and blur the "panic = bug = 500 + page someone"
+ semantics that recovery middleware exists to serve. The parent proposal
+ rejected panic-on-shed for the MQ path for the same noise reasons.
+- **Put the shedding middleware in each consuming repo.** Rejected — 10
+ services would duplicate the middleware and its tests. It lives once, here.
+- **Export `await`/`admit` (or their interfaces) for the consumer to call.**
+ Rejected — both are only ever invoked by the in-package `Admission`
+ middleware and `AsHTTPHandler` decorator, so keeping them unexported shrinks
+ the public surface to two functions and prevents consumers from reaching past
+ the adapters. The adapters take/return standard `messaging.Handler` and assert
+ to the unexported interfaces internally.
+- **Keep a hard-full `default` backstop inside `await`.** Rejected — without a
+ return value a silent post-admit shed would leave `command.Result` zero and
+ the shell would emit a wrong status (e.g. 404). The ctx-honoring send bounds
+ the enqueue wait without it; `admit()` is the watermark gate.
+- **Single shared method branching on caller type via a `ctx` value or
+ `Options.Source`.** Rejected — `streaming` acks unconditionally on clean
+ `Handle` return, so an MQ-side shed-then-return would silently drop messages.
+ The two paths require fundamentally different behavior. Separate methods make
+ the contract visible at every wiring site.
+- **Inject a per-batch `ctx` through Persistence and Broadcast.** Rejected.
+ Per-batch ctx in retry-forever stages would unwind partially-completed work
+ and break the durability principle. The pipeline ctx (`harness.New(ctx, …)`)
+ is the right scope for those stages.
+- **Keep `BatchCapacity` sizing all channels uniformly.** Rejected — preserves
+ the in-memory drift problem during outages.
+- **Add nack/error return to `messaging.Handler.Handle` itself.** Rejected as
+ out-of-scope — would touch every existing `messaging.Handler` implementation
+ across all consumers.
+
+## Trade-offs & Risks
+
+- **The only new exported surface is two functions — `AsHTTPHandler` and
+ `Admission`.** Both take and return the standard `messaging.Handler` /
+ `http.Handler` types. The `await`/`admit` methods and the `awaiter`/`admitter`
+ interfaces they assert against are unexported, so the consumer never names
+ them; `AsHTTPHandler(New(...))` and `Admission(New(...), shell)` are the whole
+ integration vocabulary. The adapters type-assert the supplied handler to the
+ unexported interfaces internally and panic at wireup if handed something other
+ than the harness entrypoint — a deliberate fail-fast on misconfiguration.
+- **This module now imports `net/http`** (in `admission.go`). Minor — it is a
+ standard-library dependency, isolated to the admission file. If desired at
+ implementation time, the middleware and decorator can move to a sibling
+ subpackage (e.g. `handlers/harness/admission`) to keep `net/http` out of the
+ core pipeline package; the unexported `awaiter`/`admitter` interfaces would
+ then need to be exported (or the adapters constructed inside `harness` and
+ re-exported). Lower-churn option (single package, unexported interfaces) is
+ preferred unless review objects.
+- **Single-message HTTP signature is a hard constraint.** A hypothetical future
+ HTTP route needing to submit multiple commands atomically would not fit.
+ Acceptable today — every existing HTTP route invokes the domain with exactly
+ one command — and reversible later (a sibling `awaitBatch` could be added
+ without breaking existing call sites).
+- **`waiterDone` allocates a goroutine and a channel per `await` call.** On the
+ HTTP path only. The cost is a few hundred bytes and one goroutine for the
+ duration of the in-flight batch — well within an HTTP request's budget. On a
+ wait-departure the goroutine parks until the pipeline eventually completes the
+ batch, then exits.
+- **`UnitCapacity=1` reduces normal-throughput headroom slightly.** Pipelining
+ is preserved (depth-1 buffer between stages) but bursty workloads that
+ previously absorbed into deep buffers now apply backpressure earlier.
+ Mitigation: configurable; set `Options.UnitCapacity(1024)` to reproduce
+ today's characteristic.
+- **Caller-departed batches keep doing work the caller no longer cares about.**
+ Intentional — matches the durability principle. Combined with the merged
+ domain-layer idempotency change, repeated retries collapse to no-ops after
+ the first applies. From this module's perspective this is a contract
+ guarantee: "we will not unwind the in-flight batch when the caller departs."
+- **The shed-threshold as a fraction is inexact, and there is a TOCTOU window
+ between `admit()` and `await`'s enqueue.** Acceptable — soft signal, not a
+ hard limit; and the ctx-honoring send means a race-window channel-full never
+ produces a wrong status (it drains or unblocks on `ctx.Done()`).
+- **`Handle` and `await` share state (`this.work`, `this.lock`, `this.closed`).**
+ Two paths writing the same channel under the same RWMutex is fine; race-free
+ under `-race`. Tests must cover both paths interleaving on a shrunk-capacity
+ fixture.
+- **Cross-repo coordination.** This module's changes are backward-compatible
+ (new options have defaults; the new exported functions don't break existing
+ `messaging.Handler` callers). A consumer that doesn't yet wrap its routes
+ keeps working unchanged. Per-service adoption is sequenced after a tagged
+ release.
+- **Diagram drift.** The pipeline diagram is the canonical visual reference; if
+ the SVG isn't updated alongside the code, reviewers will form a stale mental
+ model. Mitigation: diagram update is in the checklist.
+
+## Implementation Checklist
+
+### Phase 1: Configuration plumbing (red/green)
+
+- [x] Edit `handlers/harness/config_test.go` (`TestDefaultsPopulateCapacities`) to also assert `cfg.UnitCapacity == 1` and `cfg.ShedThreshold == 0.80`. Run `make test` — confirm failure (fields don't exist yet → compile error).
+- [x] Edit `handlers/harness/config_test.go` (`TestTunableOptionsOverrideDefaults`) to also exercise `Options.UnitCapacity(2)` and `Options.ShedThreshold(0.5)` and assert the values stick. Compile error still expected.
+- [x] Edit `handlers/harness/config.go` — add `UnitCapacity int` and `ShedThreshold float64` fields to `configuration`; add `Options.UnitCapacity(int)` and `Options.ShedThreshold(float64)` setters; add the two defaults (`UnitCapacity=1`, `ShedThreshold=0.80`) to `Options.defaults(...)`.
+- [x] Run `make test` — confirm config tests pass.
+
+### Phase 2: Pipeline rewires for split capacity (red/green)
+
+- [x] Edit `handlers/harness/pipeline.go` — change `work1`–`work5` to `make(chan *unitOfWork, config.UnitCapacity)`; thread `config.UnitCapacity` into the `newFanOut` call.
+- [x] Edit `handlers/harness/fanout.go` — extend `newFanOut`'s signature to take a `unitCapacity int` and use it where `1024` is currently hardcoded.
+- [x] Run `make test` — pipeline tests should still pass under the new defaults; if any test depends on the old 1024 buffer it should be updated to set `Options.UnitCapacity(1024)` explicitly.
+
+### Phase 3: Monitor observations and sentinels
+
+- [x] Edit `handlers/harness/contracts.go` — add `LoadShed struct{}` and `CallerDeparted struct{}` event types alongside the existing `BatchInFlight`/`BatchComplete`/etc.
+- [x] Edit `handlers/harness/00_entrypoint.go` — add unexported sentinel values `var loadShed LoadShed` and `var callerDeparted CallerDeparted` next to the existing `batchInFlight`/`batchComplete`.
+- [x] Run `make test` — confirm the existing suite still compiles and passes.
+
+### Phase 4: Extract shared helpers (pure refactor — keep `Handle` behavior identical)
+
+- [x] Refactor `handlers/harness/00_entrypoint.go` to extract `prepare(ctx, messages ...any) (*sync.WaitGroup, *batch)`, `abandon(waiter, item)`, and `waiterDone(waiter) chan struct{}`; rewrite `Handle`'s body in terms of `prepare(ctx, messages...)` so it is observably identical.
+- [x] Run `make test` — all existing tests must still pass; this step changes no externally observable behavior.
+
+### Phase 5: Add `await` (TDD, void HTTP path, single message)
+
+- [x] Add `TestAwait_ReturnsAfterCompletion` — call `await(ctx, "msg")` with a single message; let the pipeline complete; assert the call returns and the batch was processed. Run `make test` — confirm failure (no `await` method yet → compile error). Note: the entrypoint must hold a `shedThreshold` field wired through `newEntrypoint` from `pipeline.go`.
+- [x] Add the unexported `await(ctx context.Context, message any)` method on `*entrypoint` with the body shown in §3 of Approach. Run — confirm `ReturnsAfterCompletion` passes.
+- [x] Add `TestAwait_UnblocksOnContextCancelWhileWaiting` — fixture with a writer that blocks forever; enqueue succeeds; cancel the caller's `ctx`; assert `await` returns, Monitor sees `CallerDeparted{}`, and the batch is **not** abandoned (pipeline still owns it). Run — confirm passing.
+- [x] Add `TestAwait_UnblocksOnContextCancelWhileEnqueuing` — fixture with `BatchCapacity=1` and a writer that blocks forever so the work channel stays full; cancel `ctx` before a slot frees; assert `await` returns, Monitor sees `CallerDeparted{}`, and the pool entry is restored (the never-enqueued batch is abandoned). Run — confirm passing.
+- [x] Add `TestAwait_BatchCarriesExactlyOneMessage` — `await(ctx, "only")`; intercept the resulting `*batch` on the work channel; assert `len(item.messages) == 1` and `item.messages[0] == "only"`. Run — confirm passing (pins the single-message contract).
+- [x] Add `TestAwait_ClosedPipelineReturnsImmediately` — close the entrypoint; call `await(ctx, "msg")`; assert it returns within a few milliseconds and the pool entry is returned. Run — confirm passing.
+
+### Phase 6: Add `admit()` gate (TDD)
+
+- [x] Add `TestAdmit_TrueWhenBelowThreshold` — fresh entrypoint, empty work channel; assert `admit()` is true. Run — confirm failure (no `admit` yet → compile error).
+- [x] Add the unexported `admit() bool` method on `*entrypoint` with the body shown in §4. Run — confirm `TrueWhenBelowThreshold` passes.
+- [x] Add `TestAdmit_FalseAtOrAboveThreshold_TracksLoadShed` — `BatchCapacity=10`, `ShedThreshold=0.5`, writer blocks forever; fill the work channel to ≥5; assert `admit()` is false and Monitor sees `LoadShed{}`. Run — confirm passing.
+- [x] Add `TestAdmit_FalseWhenClosed_NoLoadShed` — close the entrypoint; assert `admit()` is false and Monitor sees **no** `LoadShed{}` (shutdown, not load). Run — confirm passing.
+- [x] Add `TestAdmit_ThresholdAtOrAboveOneDisablesWatermark` — `ShedThreshold=2.0`; fill the channel; assert `admit()` stays true until the pipeline is closed. Run — confirm passing.
+
+### Phase 7: Add `AsHTTPHandler` decorator and `Admission` middleware (TDD, in-module)
+
+- [x] Add `TestAsHTTPHandler_ForwardsSingleMessageToAwait` (in `admission_test.go`) — a fake that implements `messaging.Handler` plus the unexported `await` (so it satisfies the internal `awaiter` assertion) records calls; `AsHTTPHandler(fake).Handle(ctx, "x")`; assert exactly one `await(ctx, "x")`. Run — confirm failure (no `AsHTTPHandler` yet).
+- [x] Add the unexported `awaiter` interface, `httpAdapter`, and `AsHTTPHandler` to `handlers/harness/admission.go`. Run — confirm passing.
+- [x] Add `TestAsHTTPHandler_ForwardsEachMessageInOrder` — `Handle(ctx, "a", "b")`; assert `await` called twice, in order (documents the variadic-to-single adaptation). Run — confirm passing.
+- [x] Add `TestAdmission_PassesThroughWhenAdmitted` — a fake implementing `messaging.Handler` plus the unexported `admit` returning true wraps a recording inner handler; serve a request; assert the inner handler ran and its response is preserved. Run — confirm failure (no `Admission` yet).
+- [x] Add the unexported `admitter` interface, `Admission`, and `shedResponseBody` to `handlers/harness/admission.go`. Run — confirm `PassesThroughWhenAdmitted` passes.
+- [x] Add `TestAdmission_Writes503WhenRejected` — fake `admit` returning false; serve a request; assert the inner handler did **not** run, status is `503`, `Content-Type` is `application/json; charset=utf-8`, `Retry-After` is `1`, and the body equals the shed JSON. Run — confirm passing.
+
+### Phase 8: Pin the existing `Handle` contract (TDD, MQ/cron path)
+
+- [x] Add `TestHandle_BlocksUntilDurable` — submit a batch via `Handle`; the writer takes a controlled delay to acknowledge; assert `Handle` returns only after the writer completes. Run — confirm passing (pins the contract `streaming` depends on).
+- [x] Add `TestHandle_DoesNotShedAtHighWatermark` — `BatchCapacity=2`, `ShedThreshold=0.5`, writer blocks forever; submit 5 batches via `Handle` (each in its own goroutine); assert all 5 are blocked; after unblocking, all 5 eventually return. Run — confirm passing.
+- [x] Add `TestHandle_IgnoresContextCancel` — submit a batch via `Handle`; cancel the ctx; assert `Handle` is still blocked until the pipeline completes the batch. Run — confirm passing (deliberate contract: MQ deliveries don't honor a deadline).
+- [x] Add `TestHandle_ReturnsImmediatelyOnClosedPipeline` — close the entrypoint; call `Handle`; assert it returns within a few milliseconds (no panic, no block). Run — confirm passing.
+- [x] Add `TestHandle_PreservesVariadicMessages` — `Handle(ctx, "a", "b", "c")`; intercept the resulting `*batch`; assert `len(item.messages) == 3`. Run — confirm passing (pins the variadic contract after the `prepare` refactor).
+
+### Phase 9: Race and integration sanity
+
+- [x] Run the full harness test suite under `-race`: `go test -race ./handlers/harness/...`. Confirm green. (Core `harness` package green; `sqladapter` requires a live MySQL — verified green with `-short`, exercised against DB in item 3 below.)
+- [x] Run `make test` (the project-level entry point that also runs `go mod tidy`, `go fmt ./...`). Confirm green. (`go fmt` clean; full `-short -race` suite green. `go mod tidy` blocked by a root-owned module cache in this sandbox — environment limitation, not a code issue.)
+- [x] Run `go test ./handlers/harness/sqladapter/...` against a live MySQL (drop `-short` if present, or use the project's `make test.db.local`-equivalent). Confirm no regressions in the SQL adapter. (No docker/MySQL in this sandbox; package compiles and `go vet`s clean and no sqladapter code was touched. **Follow-up: run `make test.db.local` locally to fully confirm.**)
+
+### Phase 10: Documentation
+
+- [x] Update `doc/work-sessions/2026/2026-05-14_pipeline-component-diagram.svg` to reflect: split `BatchCapacity`/`UnitCapacity` knobs, the `admit` gate + `Admission` middleware in front of the HTTP ingress, `LoadShed`/`CallerDeparted` Monitor observations, and the void `await(ctx, message any)` ingress alongside `Handle`. (The current SVG shows a single `BatchCapacity` annotation and the `Handle` ingress; both need updating.)
+- [x] Self-review the diff: confirm no `messaging.Handler` callers were inadvertently broken; confirm `admission.go` imports no `scuter` and mentions no `scuter` in comments; confirm `Options.UnitCapacity(1024)` reproduces today's runtime if a user wants it; confirm no domain code or sqladapter code was touched.
+
+### Out of scope (handled in each consuming repo's follow-on)
+
+- Per-service route wireup (`AsHTTPHandler` + `Admission` in each routes table), application-side monitor metric registration, integration tests, and post-deploy observation drills. These are addressed per service after this module's changes merge and a version is tagged.
diff --git a/doc/work-sessions/2026/2026-06-01_12-19-49-proposal-waiter-pool-corruption-fix.md b/doc/work-sessions/2026/2026-06-01_12-19-49-proposal-waiter-pool-corruption-fix.md
new file mode 100644
index 0000000..856a00b
--- /dev/null
+++ b/doc/work-sessions/2026/2026-06-01_12-19-49-proposal-waiter-pool-corruption-fix.md
@@ -0,0 +1,254 @@
+---
+name: Fix pooled WaitGroup corruption in entrypoint.await
+description: Stop returning a pooled *sync.WaitGroup to the pool while its count is non-zero and a detached Wait() goroutine still references it, eliminating a sync.WaitGroup-misuse / cross-request corruption hazard on the caller-departed path.
+type: plot
+---
+
+# Proposal: Fix pooled `WaitGroup` corruption in `entrypoint.await`
+
+## Background
+
+`handlers/harness/00_entrypoint.go` exposes two ingress paths into the pipeline:
+
+- `Handle(ctx, messages...)` — the at-least-once (RMQ) path. It enqueues a batch
+ and blocks **inline** on `waiter.Wait()` until the pipeline calls `complete()`.
+- `await(ctx, message)` — the context-honoring (HTTP) path. It enqueues a batch
+ and then waits on **either** completion **or** `ctx.Done()`, so a departed
+ caller (cancelled request) does not block on durable processing.
+
+Both paths share `prepare(...)`, which leases a `*sync.WaitGroup` from a
+`sync.Pool` (`this.waiters`), calls `waiter.Add(1)`, and captures that waiter in
+the batch's `complete` closure:
+
+```go
+func (this *entrypoint) prepare(ctx context.Context, messages ...any) (waiter *sync.WaitGroup, item *batch) {
+ waiter = this.waiters.Get()
+ waiter.Add(1)
+ item = this.batches.Get()
+ item.ctx = ctx
+ item.messages = messages
+ item.complete = func() {
+ waiter.Done()
+ this.monitor.Track(batchComplete)
+ this.batches.Put(item)
+ }
+ return waiter, item
+}
+```
+
+`complete()` is invoked **later**, by the pipeline's `completion` stage
+(`04_completion.go`), once the batch is durably persisted.
+
+### The bug
+
+`await` returns the waiter to the pool unconditionally via `defer`:
+
+```go
+func (this *entrypoint) await(ctx context.Context, message any) {
+ ...
+ waiter, item := this.prepare(ctx, message)
+ defer this.waiters.Put(waiter) // (B) blanket Put — the defect
+
+ select {
+ case this.work <- item:
+ this.lock.RUnlock()
+ this.monitor.Track(batchInFlight)
+ case <-ctx.Done():
+ this.lock.RUnlock()
+ this.abandon(waiter, item) // calls waiter.Done() -> count 0
+ this.monitor.Track(callerDeparted)
+ return
+ }
+
+ select {
+ case <-this.waiterDone(waiter): // detached goroutine: waiter.Wait(); close(done)
+ case <-ctx.Done(): // (A) caller departed WHILE waiting
+ this.monitor.Track(callerDeparted)
+ }
+}
+```
+
+On path **(A)** — the caller's context is cancelled *after* the batch was
+successfully enqueued — `await` returns and the deferred `Put` (B) returns the
+waiter to the pool. But at that instant:
+
+1. The pipeline still owns the batch and **has not yet called `complete()`**, so
+ the waiter's internal counter is still `1` (the `Add(1)` from `prepare` has no
+ matching `Done()` yet).
+2. The detached goroutine spawned by `waiterDone(waiter)` is **still blocked
+ inside `waiter.Wait()`**, holding a live reference to the same waiter.
+
+A subsequent `prepare()` can then `Get()` that very waiter and call `Add(1)`
+again. Per the `sync.WaitGroup` contract, *"if a WaitGroup is reused to wait for
+several independent sets of events, new `Add` calls must happen after all
+previous `Wait` calls have returned"*. Here a new `Add` races a previous `Wait`
+that has not returned, which is documented misuse. Consequences range from the
+Go runtime panicking (`sync: WaitGroup is reused before previous Wait has
+returned` / `WaitGroup misuse: Add called concurrently with Wait`) to silent
+cross-request corruption, where one HTTP request's `await` blocks until an
+unrelated earlier batch also completes.
+
+### Why the other paths are safe (and stay safe)
+
+| Path | State of waiter when returned to pool | Detached `Wait()` goroutine? | Safe? |
+|------------------------------------|--------------------------------------------------|---------------------------------|--------|
+| `Handle` normal | `Wait()` returned inline → count 0 | none (waits inline) | yes |
+| `await` enqueue-failed (`abandon`) | `abandon` called `Done()` → count 0 | none (2nd select not reached) | yes |
+| `await` normal completion | `waiterDone` fired → `Wait()` returned → count 0 | finished before `done` received | yes |
+| `await` departed-while-waiting (A) | count still 1, `complete()` pending | **still blocked in `Wait()`** | **NO** |
+
+Only path (A) is defective. The fix must keep the safe paths recycling waiters
+(the pool exists to cut allocations under load) while never recycling a waiter
+that is still "in use."
+
+## Approach
+
+**Recommended: Option A — targeted lifecycle fix (minimal diff).**
+
+Remove the blanket `defer this.waiters.Put(waiter)` and instead return the
+waiter to the pool *only* at the three points where it is provably quiescent
+(counter at 0, no detached goroutine still reading it). On the departed-while-
+waiting path, deliberately do **not** recycle the waiter: let it fall out of
+scope. `complete()` will still fire later, unblocking the detached `waiterDone`
+goroutine, after which both the waiter and the goroutine become garbage. We
+"lose" exactly one pooled waiter per departed-in-flight request — an exceptional
+path — which `sync.Pool` is designed to tolerate.
+
+Revised `await`:
+
+```go
+func (this *entrypoint) await(ctx context.Context, message any) {
+ this.lock.RLock()
+ if this.closed {
+ this.lock.RUnlock()
+ return
+ }
+
+ waiter, item := this.prepare(ctx, message)
+
+ select {
+ case this.work <- item:
+ this.lock.RUnlock()
+ this.monitor.Track(batchInFlight)
+ case <-ctx.Done():
+ this.lock.RUnlock()
+ this.abandon(waiter, item)
+ this.waiters.Put(waiter) // safe: abandon() called Done() (count 0); no detached waiter.
+ this.monitor.Track(callerDeparted)
+ return
+ }
+
+ select {
+ case <-this.waiterDone(waiter):
+ this.waiters.Put(waiter) // safe: detached Wait() returned before done was closed (count 0).
+ case <-ctx.Done():
+ this.monitor.Track(callerDeparted)
+ // Intentionally do NOT recycle the waiter here: complete() is still
+ // pending and the detached waiterDone goroutine is still inside Wait().
+ // Returning it to the pool now would let a later prepare() call Add(1)
+ // before this Wait() returns -- documented sync.WaitGroup misuse. The
+ // waiter is released to GC once complete() fires.
+ }
+}
+```
+
+`Handle` is unchanged: its `defer this.waiters.Put(waiter)` runs only after its
+inline `waiter.Wait()` returns, so the waiter is quiescent at recycle time.
+
+`prepare`, `abandon`, and `waiterDone` are unchanged.
+
+### Alternative considered — Option B: replace the pooled WaitGroup with a single-use completion channel
+
+Each ingress waits for **exactly one** `complete()` call, so a `sync.WaitGroup`
+is heavier than needed. We could give `batch` a `done chan struct{}` that
+`complete()` closes once; `Handle` does `<-done` and `await` does
+`select { case <-done: ... case <-ctx.Done(): ... }`. This eliminates the
+detached `waiterDone` goroutine and the entire pool-reuse hazard class, because
+a channel is single-use and never recycled — an abandoned `done` channel is
+simply collected by GC.
+
+Rejected as the primary fix because:
+
+- It is a larger change that touches `Handle`'s hot path and removes the
+ `waiters` pool the author added deliberately for allocation reduction.
+- It trades pooled-waiter reuse for a per-request channel allocation.
+- The user's request is scoped to *resolving the corruption concern*, and
+ Option A does so with a minimal, reviewable diff while preserving the
+ existing design intent and all current passing tests.
+
+Option B remains a reasonable future simplification if the detached goroutine or
+per-request waiter churn proves costly; noting it here so the decision is
+explicit. **Open question for the reviewer:** prefer the minimal Option A, or
+take the larger Option B cleanup now?
+
+## Trade-offs & Risks
+
+- **One pooled waiter is dropped per departed-in-flight request.** This is the
+ exceptional path (HTTP caller cancelled after enqueue). Under sustained
+ cancellation storms the pool simply allocates fresh waiters; `sync.Pool`
+ absorbs this without leaking memory (dropped waiters are GC'd once their
+ pending `complete()` fires). No unbounded growth.
+- **The detached `waiterDone` goroutine still lingers until `complete()`** on the
+ departed path. This is pre-existing behavior and is bounded by pipeline drain
+ time; the work is genuinely still in flight, so tracking its completion with
+ one goroutine is acceptable. Option A does not worsen this; Option B would
+ remove it.
+- **Deterministic red-test difficulty.** The corruption only manifests on pool
+ *reuse* under concurrency, and `sync.Pool.Get` gives no reuse guarantee, so a
+ strictly deterministic failing unit test is not practical. The honest red test
+ is a race-enabled stress test (see Phase 1) that reliably trips Go's built-in
+ `WaitGroup` misuse detector before the fix and passes after. This is called out
+ explicitly rather than hidden behind a false "deterministic" claim.
+
+## Implementation Checklist
+
+### Phase 1: Capture the corruption (red)
+
+- [x] Add `TestAwait_DepartedInFlightDoesNotCorruptPooledWaiter` to
+ `handlers/harness/00_entrypoint_test.go`. The test, in a loop sized for
+ reliable surfacing (e.g. a few thousand iterations), should: (1) start an
+ `await` whose context it cancels *after* the batch is received from
+ `this.work` (departed-while-waiting), deferring the matching `complete()` to a
+ background goroutine; (2) concurrently issue fresh `await`/`Handle` calls on
+ the **same** entrypoint that drive `prepare()` (and thus pool `Get`/`Add`),
+ completing each; so a recycled-too-early waiter is exercised by a new request.
+ (Implemented as 8 worker goroutines × 2000 departing `await`s against a
+ concurrent drainer that completes each enqueued batch — this produces the
+ recycle/reuse race more reliably than a sequential loop.)
+- [x] Run `go test -race -run TestAwait_DepartedInFlightDoesNotCorruptPooledWaiter ./handlers/harness/`
+ and confirm it fails for the right reason: a `sync: WaitGroup ...` misuse panic
+ or a race report on the waiter (NOT a generic assertion mismatch). Record the
+ observed failure mode in the PR description.
+ **Observed:** `WARNING: DATA RACE` on the waiter (read in `await` at
+ `00_entrypoint.go:80` vs. write from the detached `waiterDone` goroutine) and
+ `panic: sync: WaitGroup is reused before previous Wait has returned`.
+
+### Phase 2: Apply the lifecycle fix (green)
+
+- [x] In `00_entrypoint.go`, remove `defer this.waiters.Put(waiter)` from
+ `await`.
+- [x] In `await`, add `this.waiters.Put(waiter)` immediately after
+ `this.abandon(waiter, item)` on the enqueue-failed branch.
+- [x] In `await`, add `this.waiters.Put(waiter)` in the
+ `case <-this.waiterDone(waiter):` branch (normal completion).
+- [x] In `await`, in the `case <-ctx.Done():` branch of the second `select`,
+ add the explanatory comment documenting why the waiter is intentionally NOT
+ recycled there.
+- [x] Confirm `Handle` is unchanged and still recycles via its inline-`Wait()`
+ `defer this.waiters.Put(waiter)`.
+
+### Phase 3: Verify (green) and guard regressions
+
+- [x] Run `go test -race -run TestAwait_DepartedInFlightDoesNotCorruptPooledWaiter ./handlers/harness/`
+ and confirm it now passes with no panic and a clean race report.
+- [x] Confirm the existing departed-path tests still pass unchanged:
+ `TestAwait_UnblocksOnContextCancelWhileWaiting` (departed-while-waiting still
+ completes and tracks `CallerDeparted` + later `BatchComplete`) and
+ `TestAwait_UnblocksOnContextCancelWhileEnqueuing` (abandon path still tracks
+ `CallerDeparted`, no `BatchInFlight`/`BatchComplete`).
+- [x] Run the full package suite: `make test` (exercises `go fmt`, `go vet`,
+ `-race`, coverage). Confirm green. (All packages pass; `handlers/harness`
+ coverage 99.6%.)
+- [x] Re-read the diff against `## CLAUDE.md` Go conventions: receiver named
+ `this`, no naked returns, no new blank lines at method start/end, struct
+ initializers use field/value pairs (no new initializers introduced here).
diff --git a/doc/work-sessions/2026/2026-06-01_13-56-53-proposal-retry-loop-cancellation.md b/doc/work-sessions/2026/2026-06-01_13-56-53-proposal-retry-loop-cancellation.md
new file mode 100644
index 0000000..2da4c15
--- /dev/null
+++ b/doc/work-sessions/2026/2026-06-01_13-56-53-proposal-retry-loop-cancellation.md
@@ -0,0 +1,373 @@
+---
+name: Bound the unbounded retry loops in persistence and broadcast
+description: Make the persistence and broadcast stages' retry loops cancellable via the pipeline context so a permanently-failing Writer/Dispatcher can no longer hang pipeline shutdown, while preserving store-and-forward durability (persistence is the ack gate; broadcast is recoverable via the startup recovery routine).
+type: plot
+---
+
+# Proposal: Bound the unbounded retry loops in `persistence` and `broadcast`
+
+## Background
+
+Two pipeline stages retry their collaborator forever with no escape hatch:
+
+`handlers/harness/03_persistence.go:39`
+
+```go
+for attempt := 1; ; attempt++ {
+ err := this.writer.Write(this.ctx, this.buffer...)
+ if err == nil {
+ failure.Attempt = 0
+ failure.Error = nil
+ break
+ }
+ failure.Attempt = attempt
+ failure.Error = fmt.Errorf("%w: %w", ErrPersistence, err)
+ this.monitor.Track(failure)
+ this.sleep(time.Second) // TODO: exponential back-off w/ jitter
+}
+```
+
+`handlers/harness/05_broadcast.go:39` is identical in shape (against `Dispatcher.Dispatch`).
+
+Both loops:
+
+1. Have **no maximum attempt count** — they spin until the collaborator
+ succeeds.
+2. **Never consult `this.ctx.Done()`.** The pipeline context is passed *into*
+ `Write`/`Dispatch` (so a ctx-honoring driver aborts the in-flight call), but
+ the loop itself ignores cancellation and immediately retries.
+3. Sleep via an **uninterruptible** `this.sleep(time.Second)` (wired to
+ `time.Sleep` in `pipeline.go`).
+
+### The failure mode
+
+Shutdown drains the pipeline by closing channels from the front: `entrypoint.Close()`
+closes the batches channel, `execution` then closes its output, and the close
+cascades stage-by-stage (`for unit := range this.input` exits, then
+`defer close(this.output)` fires). Each stage finishes the **unit it is
+currently holding** before observing the closed input.
+
+If the database (persistence) or broker (broadcast) is unreachable when shutdown
+begins, the stage holding an in-flight unit never breaks out of its retry loop,
+never returns to `range this.input`, never observes the closed channel, and
+**never closes its output**. The drain stalls, the remaining stages never see
+their inputs close, and shutdown hangs until the orchestrator SIGKILLs the
+process after its grace period.
+
+### The design constraint that shapes the fix
+
+The pipeline is store-and-forward, and the stage order matters
+(`handlers/harness/pipeline.go`):
+
+```
+entrypoint → execution → serialization → persistence → completion → broadcast → terminal
+ (03) (04) (05)
+```
+
+`complete()` — which calls `waiter.Done()` (unblocking the `Handle`/`await`
+caller) and lets the upstream MQ delivery be acked — fires in the **completion**
+stage (04), which runs **after persistence (03)** and **before broadcast (05)**.
+
+That ordering creates a hard asymmetry between the two failing stages:
+
+| Stage | Runs vs. ack | On give-up, is the message recoverable? |
+|--------------------|----------------|----------------------------------------------------------------------------------------------------------------------------------|
+| `persistence` (03) | **before** ack | No. If the row was never written, forwarding it acks/loses the message. Only MQ redelivery recovers it. |
+| `broadcast` (05) | **after** ack | Yes. The row is already durably stored; `sqladapter.Recover` re-dispatches every row `WHERE dispatched IS NULL` at next startup. |
+
+The resilience proposal
+(`2026-05-28_15-30-32-proposal-harness-resilience-module-changes.md`) already
+ruled out per-batch context in these stages and fixed their cancellation scope:
+
+> **Inject a per-batch `ctx` through Persistence and Broadcast.** Rejected.
+> Per-batch ctx in retry-forever stages would unwind partially-completed work and
+> break the durability principle. The pipeline ctx (`harness.New(ctx, …)`) is the
+> right scope for those stages.
+
+So the intended (but not-yet-implemented) cancellation signal for these loops is
+the **pipeline context** — the `ctx` the consumer passes to `harness.New(ctx, …)`
+and is expected to cancel on shutdown. This proposal wires that signal in.
+
+## Approach
+
+Make both retry loops **cancellable via `this.ctx`**, and act on cancellation
+according to each stage's recoverability (the asymmetry above):
+
+- **`broadcast` (recoverable):** on `this.ctx` cancellation, stop retrying and
+ **forward the unit** to `terminal` as usual. The message is already persisted;
+ `sqladapter.Recover` redispatches it on the next startup. No data loss, clean
+ shutdown.
+
+- **`persistence` (durability gate):** on `this.ctx` cancellation, stop retrying
+ and **drop the unit without forwarding it** — `complete()` is never called, so
+ the upstream MQ delivery is never acked and is redelivered on the next run. The
+ stage then continues its `range` loop and drains normally, so shutdown
+ proceeds. Forwarding here is *not* an option: it would reach the completion
+ stage, ack the caller, and lose a message that was never stored.
+
+#### HTTP-origin units (the `await` path)
+
+The drop decision is identical for HTTP-submitted units, but the *recovery*
+mechanism differs, because there is no broker to redeliver. When persistence
+drops an HTTP-origin unit on shutdown:
+
+- `complete()` never fires, so the in-flight `await` caller is **not** unblocked
+ via the completion path. It unblocks instead when its own request context is
+ cancelled (the `case <-ctx.Done()` arm of `await`'s second select), tracking
+ `CallerDeparted` — exactly today's departed-caller behavior. Whether that
+ request context is actually cancelled at shutdown is a consumer-side graceful-
+ shutdown concern (does the HTTP server cancel active request contexts?), out of
+ scope here.
+- Recovery relies on **client retry + domain idempotency**, the model the
+ resilience proposal already established for the HTTP path: a client whose
+ request did not durably succeed retries, and once the merged domain-layer
+ idempotency change is in place repeated retries collapse to no-ops after the
+ first applies. There is no `sqladapter.Recover` safety net for an un-stored
+ unit (recovery only re-dispatches rows that *were* stored), so the client retry
+ is the sole recovery path — which is why dropping (rather than forwarding) is
+ still the correct choice: forwarding would unblock the caller as though the
+ write succeeded, falsely confirming a message that was never stored.
+
+There is no regression versus today: an HTTP request in flight against a dead
+database currently hangs until SIGKILL just the same; this change makes the
+process shut down cleanly instead.
+
+A new interruptible backoff replaces the uninterruptible sleep, so shutdown
+aborts the delay promptly instead of waiting out a full second per attempt.
+
+### The cancellable-wait seam
+
+Replace the injected `sleep func(time.Duration)` with an injected, ctx-aware
+waiter:
+
+```go
+// retry.go (new)
+package harness
+
+import (
+ "context"
+ "time"
+)
+
+// wait sleeps for d, or returns early with ctx.Err() if ctx is cancelled first.
+func wait(ctx context.Context, d time.Duration) error {
+ timer := time.NewTimer(d)
+ defer timer.Stop()
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-timer.C:
+ return nil
+ }
+}
+```
+
+`pipeline.go` passes `wait` where it currently passes `time.Sleep`. Tests inject
+a fixture method that records the requested durations (preserving the existing
+`sleeps`-style assertions) and can simulate cancellation by returning a non-nil
+error.
+
+> **Why inject the wait function rather than a duration?** A reasonable
+> alternative is to keep `wait` as a plain free function, store only a
+> `time.Duration` on each stage, and have the stage call `wait(this.ctx, this.delay)`
+> directly. That is cleaner production code. It is rejected for one reason: the
+> **test seam**. The stages today inject `sleep func(time.Duration)` precisely so
+> the fixtures can (a) record how many times and for how long the loop backed off
+> and (b) run instantly without real 1-second sleeps. If we called the real `wait`
+> directly, every retry test would block on actual wall-clock time and could not
+> observe the backoff. Keeping the injected-function seam preserves the existing
+> `TestRetriesUntil…Succeeds` assertions verbatim and lets a fixture simulate
+> cancellation synchronously by returning an error on a chosen attempt. The free
+> `wait` function still exists — it is simply the production value wired in at
+> `pipeline.go`, while tests substitute their own. (When exponential backoff lands
+> later, the *duration* moves inside `wait`/its production implementation; the
+> injected-function seam stays, so this decision does not block that work.)
+
+This keeps the fixed 1-second delay; **exponential back-off with jitter remains a
+separate, orthogonal TODO** and is out of scope here — this change is strictly
+about bounding/cancelling the loops.
+
+### Revised `persistence`
+
+```go
+func (this *persistence) Listen() {
+ defer close(this.output)
+ for unit := range this.input {
+ for _, message := range unit.results {
+ this.buffer = append(this.buffer, message)
+ }
+ stored := this.store()
+ this.buffer = this.buffer[:0]
+ if !stored {
+ continue // shutdown before durable write: do NOT forward (no ack); MQ redelivers
+ }
+ this.output <- unit
+ }
+}
+
+func (this *persistence) store() (stored bool) {
+ var failure PersistenceError
+ for attempt := 1; ; attempt++ {
+ err := this.writer.Write(this.ctx, this.buffer...)
+ if err == nil {
+ return true
+ }
+ failure.Attempt = attempt
+ failure.Error = fmt.Errorf("%w: %w", ErrPersistence, err)
+ this.monitor.Track(failure)
+ if this.wait(this.ctx, time.Second) != nil {
+ this.monitor.Track(PersistenceAbandoned{Attempts: attempt})
+ return false
+ }
+ }
+}
+```
+
+### Revised `broadcast`
+
+```go
+func (this *broadcast) Listen() {
+ defer close(this.output)
+ for unit := range this.input {
+ for _, message := range unit.results {
+ this.buffer = append(this.buffer, message)
+ }
+ this.dispatch()
+ this.buffer = this.buffer[:0]
+ this.output <- unit // always forward: already persisted; recovery redispatches if not sent
+ }
+}
+
+func (this *broadcast) dispatch() {
+ var failure BroadcastError
+ for attempt := 1; ; attempt++ {
+ err := this.dispatcher.Dispatch(this.ctx, this.buffer...)
+ if err == nil {
+ return
+ }
+ failure.Attempt = attempt
+ failure.Error = fmt.Errorf("%w: %w", ErrBroadcast, err)
+ this.monitor.Track(failure)
+ if this.wait(this.ctx, time.Second) != nil {
+ this.monitor.Track(BroadcastAbandoned{Attempts: attempt})
+ return
+ }
+ }
+}
+```
+
+### New Monitor observations (`contracts.go`)
+
+```go
+PersistenceAbandoned struct{ Attempts int } // emitted when persistence stops retrying due to shutdown; the unit was dropped and the upstream message will be redelivered
+BroadcastAbandoned struct{ Attempts int } // emitted when broadcast stops retrying due to shutdown; the message is persisted and will be redispatched by recovery
+```
+
+These give operators an explicit, alertable signal that a shutdown abandoned
+in-flight work — distinct from the per-attempt `PersistenceError`/`BroadcastError`.
+
+### Files created / modified
+
+| File | Change |
+|-------------------------------------------|---------------------------------------------------------------------------------------------|
+| `handlers/harness/retry.go` | **New.** The `wait(ctx, d)` interruptible-backoff helper. |
+| `handlers/harness/03_persistence.go` | Replace `sleep` field/param with `wait`; extract `store()`; drop-without-forward on cancel. |
+| `handlers/harness/05_broadcast.go` | Replace `sleep` field/param with `wait`; extract `dispatch()`; forward-on-cancel. |
+| `handlers/harness/contracts.go` | Add `PersistenceAbandoned` and `BroadcastAbandoned` observation types. |
+| `handlers/harness/pipeline.go` | Pass `wait` to `newPersistence`/`newBroadcast` instead of `time.Sleep`. |
+| `handlers/harness/03_persistence_test.go` | Rename `sleep`→`wait` seam; add ctx-cancel abandonment test (unit **not** forwarded). |
+| `handlers/harness/05_broadcast_test.go` | Rename `sleep`→`wait` seam; add ctx-cancel abandonment test (unit **is** forwarded). |
+
+### Alternatives considered
+
+- **Persistence keeps retrying forever (only fix broadcast).** Treat persistence's
+ infinite retry as intentional durability behavior and rely on the orchestrator's
+ grace-period + SIGKILL + MQ redelivery to bound shutdown. Simpler, and arguably
+ "more durable" (never gives up). Rejected as the recommendation because it leaves
+ the original concern — a hung shutdown when the DB is down — unresolved, trading a
+ clean shutdown for a SIGKILL every time. Presented here as the headline open
+ question (see below); it is a one-line variant of this proposal (skip the
+ persistence drop path, keep the loop uninterruptible).
+
+- **Bounded max-attempts, then forward.** Give each loop a retry ceiling and forward
+ downstream when exhausted. Rejected: for persistence, forwarding an un-stored unit
+ acks and loses the message; for broadcast it is unnecessary (recovery already
+ handles it). A ceiling also turns transient-but-long outages into data loss during
+ *normal* operation, not just shutdown.
+
+- **Per-batch context.** Already rejected by the resilience proposal (would unwind
+ partially-completed work); not revisited.
+
+## Trade-offs & Risks
+
+- **Decision (settled): persistence drops the in-flight unit on shutdown.** On
+ shutdown with a dead database, `persistence` stops retrying and drops the
+ in-flight unit (clean shutdown; the un-acked MQ delivery is redelivered, or the
+ HTTP client retries). The rejected alternative — keep retrying forever, bounded
+ only by orchestrator SIGKILL — is retained in *Alternatives considered* for the
+ record. Broadcast is unaffected by this choice (forward-on-cancel either way).
+
+- **Dropped-unit caller blocks until process exit.** When persistence
+ drops a unit on shutdown, its `complete()` never fires, so an in-flight
+ `Handle`/`await` caller stays blocked on `waiter.Wait()`. This is acceptable during
+ shutdown — the consumer cancels `this.ctx`, the MQ delivery framework tears those
+ goroutines down, and the un-acked message is redelivered. It does mean a
+ `goleak`-style "no leaked goroutines" assertion must not be applied across the
+ shutdown-abandonment path.
+
+- **Cancellation depends on collaborators honoring `ctx`.** If `Writer.Write` /
+ `Dispatcher.Dispatch` block forever ignoring the passed `ctx`, our loop cannot
+ reach the `wait` check until that call returns. The `sqladapter` Writer/Dispatcher
+ use `BeginTx(ctx)`/`ExecContext(ctx)`/ctx-scoped connector calls, so they abort
+ promptly on cancellation. Worth stating as a documented contract for custom
+ collaborators.
+
+- **Requires the consumer to cancel the pipeline ctx on shutdown.** The retry loops
+ abort on `this.ctx` cancellation; the channel-close cascade alone does not cancel
+ `this.ctx`. If a consumer wires `harness.New` with a `context.Background()` that is
+ never cancelled, the in-flight retry remains unbounded (it still drains correctly
+ on the *next* unit boundary, but the currently-held unit can still hang). This is
+ the intended contract per the resilience proposal; it should be called out in the
+ package doc.
+
+- **Deterministic tests are straightforward here** (unlike the waiter-pool fix): the
+ injected `wait` seam returns a chosen error synchronously, so abandonment is
+ exercised without real timing or concurrency races.
+
+- **No change to the happy path or to the fixed-1s backoff.** Existing retry tests
+ keep asserting two 1-second waits before success; only the seam's name/signature
+ changes.
+
+## Implementation Checklist
+
+### Phase 1: Cancellable-wait seam (refactor, stays green)
+
+- [x] Add `handlers/harness/retry.go` with the `wait(ctx context.Context, d time.Duration) error` helper shown in Approach.
+- [x] In `pipeline.go`, change `newPersistence(...)` and `newBroadcast(...)` call sites to pass `wait` instead of `time.Sleep`.
+- [x] In `03_persistence.go` and `05_broadcast.go`, change the struct field and constructor parameter from `sleep func(time.Duration)` to `wait func(context.Context, time.Duration) error` (no behavior change yet — call `this.wait(this.ctx, time.Second)` and ignore the returned error so the loops still spin forever).
+- [x] In both `*_test.go` fixtures, rename the `sleep`/`sleeps` seam to a `wait`/`waits` method matching the new signature, recording durations and returning `nil`.
+- [x] Run `make test` — confirm green (pure refactor; existing `TestRetriesUntil…Succeeds` still sees two recorded 1s waits).
+
+### Phase 2: Broadcast cancellation — forward on shutdown (red → green)
+
+- [x] Add `BroadcastAbandoned struct{ Attempts int }` to `contracts.go`.
+- [x] Add `TestBroadcastAbandonsOnContextCancelButStillForwards`: dispatcher always fails; fixture `wait` returns `context.Canceled` on its first call. Expect failure (compile error — `BroadcastAbandoned` referenced before the loop emits it, or assertion fails because the current loop ignores the wait error and spins). Record the failure reason.
+- [x] Run the new test, confirm it fails for the right reason. (Observed: 5s `-timeout` kill — the loop ignored the wait error and spun forever against the always-failing dispatcher, never forwarding or tracking abandonment.)
+- [x] Implement: extract `dispatch()`; on `this.wait(...) != nil`, `Track(BroadcastAbandoned{Attempts: attempt})` and return; `Listen()` still forwards the unit to `output` afterward.
+- [x] Run the test — confirm: exactly one `Dispatch` call, one recorded wait, one `BroadcastError` then one `BroadcastAbandoned` tracked, and the unit **was** forwarded to `output`.
+- [x] Confirm `TestRetriesUntilDispatchSucceeds` and the other broadcast tests still pass.
+
+### Phase 3: Persistence cancellation — drop without forwarding (red → green)
+
+- [x] Add `PersistenceAbandoned struct{ Attempts int }` to `contracts.go`.
+- [x] Add `TestPersistenceAbandonsOnContextCancelAndDropsUnit`: writer always fails; fixture `wait` returns `context.Canceled` on its first call. Expect failure (the current loop ignores the wait error and spins, so the test would hang / never see abandonment). Record the failure reason.
+- [x] Run the new test, confirm it fails for the right reason. (Observed: 5s `-timeout` kill — the loop ignored the wait error and spun forever against the always-failing writer, never dropping the unit or tracking abandonment.)
+- [x] Implement: extract `store() bool`; on `this.wait(...) != nil`, `Track(PersistenceAbandoned{Attempts: attempt})` and return `false`; `Listen()` skips `output <- unit` (does **not** forward) when `store()` returns false, resets the buffer, and continues the range loop.
+- [x] Run the test — confirm: exactly one `Write` call, one recorded wait, one `PersistenceError` then one `PersistenceAbandoned` tracked, and the unit was **not** forwarded to `output` (output closes empty after input closes).
+- [x] Confirm `TestRetriesUntilWriteSucceeds` and the other persistence tests still pass.
+
+### Phase 4: Full verification & conventions
+
+- [x] Run `make test` (fmt, vet, `-race`, coverage) — confirm green and that `handlers/harness` coverage has not regressed. (All packages pass; `handlers/harness` coverage 97.7%, unchanged.)
+- [x] Add a short note to the `package harness` doc comment in `config.go`: the persistence/broadcast retry loops abort on cancellation of the context passed to `New(ctx, …)`; consumers must cancel it on shutdown, and custom `Writer`/`Dispatcher` implementations must honor the context they are given.
+- [x] Re-read the diff against the `CLAUDE.md` Go conventions: receiver named `this`; named slice/return values where applicable; no naked returns; no blank lines at method start/end; struct initializers use field/value pairs; multi-line struct literals close the brace on their own line.
diff --git a/doc/work-sessions/2026/2026-06-01_16-14-19-proposal-dispatch-content-encoding.md b/doc/work-sessions/2026/2026-06-01_16-14-19-proposal-dispatch-content-encoding.md
new file mode 100644
index 0000000..636cb50
--- /dev/null
+++ b/doc/work-sessions/2026/2026-06-01_16-14-19-proposal-dispatch-content-encoding.md
@@ -0,0 +1,336 @@
+---
+name: Stop double-encoding payloads across dispatch and recovery
+description: Eliminate the redundant second serialization in the sqladapter Dispatcher and propagate ContentType through the in-memory harness Message — closing the TODO at handlers/harness/sqladapter/dispatcher.go:58 and fixing the recovery-path content-type and Topic mishandling. No schema changes.
+type: plot
+---
+
+# Proposal: Stop double-encoding payloads across dispatch and recovery
+
+## Background
+
+The harness pipeline is store-and-forward: each message is serialized once
+(stage 02), durably stored (stage 03), then published to the broker (stage 05
+via `Dispatcher.Dispatch`). On startup, `sqladapter.Recover` re-publishes any
+row whose `dispatched` column is still `NULL`.
+
+The bytes that get stored in the `Messages.payload` column **must** be byte-
+identical to the bytes that get published to the broker — otherwise a recovery
+re-publishes a different message than the original send, silently. Today the
+code does not guarantee this, and in one configuration it cannot publish a
+recovered row at all.
+
+There are three connected defects across `handlers/harness/02_serialization.go`,
+`handlers/harness/sqladapter/dispatcher.go`, and
+`handlers/harness/sqladapter/recovery.go`. They are usually invisible (the
+production serializer is JSON, which makes the bug benign) and they are masked
+by the unit-test stub connector — but they are real, and recovery against a
+real RabbitMQ writer is broken today.
+
+### Defect 1: payload is encoded twice on the happy path
+
+`02_serialization.go:29` encodes `message.Value` into `message.Content` (these
+are the bytes destined for the durable row).
+
+`sqladapter/dispatcher.go:64-67` then builds a `messaging.Dispatch` with
+`Message: message.Value` (the in-memory Go struct) and passes it to the
+transport `Writer`. The transport writer is the `serialization.defaultWriter`
+(`serialization/connector.go:103`), which loops calls into
+`defaultDispatchEncoder.Encode` (`serialization/dispatch_encoder.go:30`). That
+encoder sees `dispatch.Payload` is empty and `dispatch.Message` is non-nil, and
+**runs the same `Serializer.Serialize` again** to produce
+`dispatch.Payload`/`dispatch.ContentType`/`dispatch.MessageType`.
+
+So the value is serialized once for the DB, then again for the broker. Both
+encodings happen with the same `Serializer`, so the resulting bytes *are* equal,
+but the harness has no guarantee of that — anyone who plugs in a serializer
+with side effects, IDs, or timestamps will get divergent stored vs. published
+bytes. And it is wasted CPU on every send.
+
+The TODO at `dispatcher.go:58-63` calls this out and asks for one of two fixes:
+
+> Either pass the pre-encoded bytes through Dispatch.Payload/MessageType/ContentType
+> and skip the connector's serialization for this writer, or drop the harness
+> Serialization stage and let the connector own all encoding.
+
+### Defect 2: `Message.ContentType` is never populated on the happy path
+
+`harness.Message` already has `ContentType` and `ContentEncoding` fields
+(`message.go:22-28`) — but `02_serialization.go` only writes
+`message.ContentType` on the **fallback** path (when the user's serializer
+returned an error and we fall back to `fmt.Sprintf("%#v")`):
+
+```go
+err := this.serializer.Serialize(message.Content, message.Value)
+if err != nil {
+ // …
+ message.ContentType = "go fmt.Sprintf(%#v)"
+ _, _ = fmt.Fprintf(message.Content, "%#v", message.Value)
+}
+```
+
+On the success path `message.ContentType` is left as the zero value `""`. The
+field exists, and the dispatcher never reads it.
+
+This is fine *today* because `dispatcher.go` re-encodes via the connector
+(Defect 1), so the connector reapplies its own `ContentType()`. The moment we
+fix Defect 1 by passing pre-encoded bytes through, we have to know what the
+content type is — and the harness never recorded it. The fix lives entirely
+in process memory: stage 02 stamps `message.ContentType` from
+`serializer.ContentType()` so the dispatcher (next stage in the same process)
+can read it. **No schema change.**
+
+### Defect 3: recovery hands the dispatcher a malformed message and breaks against a real broker
+
+`recovery.go:42-47` builds a `harness.Message` from the row:
+
+```go
+messages = append(messages, &harness.Message{
+ ID: id,
+ Type: typeName,
+ Content: bytes.NewBuffer(payload),
+ ContentType: "application/json",
+})
+```
+
+`Value` is left nil. The dispatcher today (per Defect 1) passes
+`Message: message.Value` to the connector — which means recovery passes
+`Message: nil` and `Payload: nil`. The connector's encoder takes its
+`dispatch.Message == nil` early-return (`dispatch_encoder.go:31`) and **never
+sets `Topic` / `MessageType` / `ContentType`**. The RabbitMQ writer then
+rejects with `ErrEmptyDispatchTopic` (`rabbitmq/writer.go:38`). So in the only
+production configuration that uses `topicFromMessageType=true`, recovery can't
+publish a single row. The current dispatcher unit tests don't catch this
+because the stub connector (`dispatcher_test.go:111`) ignores the topic field
+entirely.
+
+Defect 3 is **fixed automatically** once Defect 1 is fixed: the new dispatcher
+reads `Type` / `ContentType` / `Content` (which recovery already populates)
+from the `*harness.Message` and hands them to the connector as pre-populated
+`MessageType` / `ContentType` / `Payload` / `Topic`. The connector encoder's
+`len(Payload) > 0` early-return then leaves everything alone, and the broker
+gets a complete dispatch.
+
+The hardcoded `"application/json"` in recovery is **kept** — we deliberately
+do not modify the `Messages` schema (the table holds 50M+ rows; we don't
+extend it for this), and the project's standing assumption is that the
+durable bytes are JSON. We document that assumption explicitly rather than
+carry it implicitly.
+
+### Why fix Defects 1 and 2 together
+
+Defect 2 is the prerequisite for fixing Defect 1: the dispatcher can only
+hand pre-encoded `ContentType` to the connector if the serialization stage
+recorded it. The two changes ride together, and they fix Defect 3 as a
+side effect.
+
+## Approach
+
+We pick the "pass-through" arm of the TODO: the harness Serialization stage
+remains the single source of truth for encoded bytes and content type; the
+sqladapter Dispatcher hands those pre-encoded bytes to the connector via
+`Dispatch.Payload` / `MessageType` / `ContentType`; the connector encoder
+short-circuits because `Payload` is non-empty. Recovery does the same
+pass-through — using the row's stored `type` and the project-wide assumption
+that stored bytes are `application/json`.
+
+This is the smaller, safer arm. The alternative (delete the harness
+Serialization stage entirely and let the connector own all encoding) would
+require persistence to also call the connector's encoder and would entangle
+the harness with the connector. Rejected on those grounds.
+
+### The three changes
+
+**Change A (`02_serialization.go`):** record the content type on the success
+path. We need `serializer.ContentType()` on the internal `serializer`
+interface — which today is just `Serialize(io.Writer, any) error`
+(`contracts.go:34`). Extend it:
+
+```go
+serializer interface {
+ Serialize(out io.Writer, in any) error
+ ContentType() string
+}
+```
+
+Then in `serialization.Listen`:
+
+```go
+err := this.serializer.Serialize(message.Content, message.Value)
+if err == nil {
+ message.ContentType = this.serializer.ContentType()
+} else {
+ // existing fallback unchanged: ContentType already set to the sentinel
+ // "go fmt.Sprintf(%#v)" by the fallback branch.
+}
+```
+
+The `nop` serializer in `config.go:112` gains a `ContentType() string { return "" }`
+implementation, the test fixture in `02_serialization_test.go:40` gains the
+same, and the production wiring (where callers pass
+`serialization.Serializer` from the `serialization` package — which already
+exposes `ContentType() string`, `contracts.go:9-11`) needs no changes since
+that interface already satisfies the extended one.
+
+**Change B (`sqladapter/dispatcher.go`):** stop passing `Value`; pass the
+pre-encoded fields:
+
+```go
+dispatches = append(dispatches, messaging.Dispatch{
+ Durable: true,
+ MessageType: message.Type,
+ ContentType: message.ContentType,
+ Payload: message.Content.Bytes(),
+ Topic: message.Type, // matches connector's topicFromMessageType=true behavior
+})
+```
+
+Why `Topic: message.Type` here? The connector's encoder normally sets
+`Topic = MessageType` *only when* its `topicFromMessageType` flag is on
+(`dispatch_encoder.go:55`). With `Payload` already populated, the encoder takes
+its `len(dispatch.Payload) > 0` early-return (`dispatch_encoder.go:31`) and
+never runs the topic-population block. So the dispatcher must populate Topic
+itself, and Topic equals Type for this pipeline. (If a future deployment needs
+a different topic-derivation rule, that rule will have to be wired into the
+sqladapter Dispatcher explicitly — there is no longer a shared encoder
+deciding it.) That deviation from the connector's behavior is **intentional
+and noted** as a trade-off below.
+
+**Change C (`sqladapter/recovery.go`):** no functional change; tighten the
+construction comment to acknowledge the JSON assumption.
+
+The current code already populates everything Change B's dispatcher needs:
+
+```go
+messages = append(messages, &harness.Message{
+ ID: id,
+ Type: typeName,
+ Content: bytes.NewBuffer(payload),
+ ContentType: "application/json",
+})
+```
+
+`Type` and `Content` come from the row; `ContentType` is the project-wide
+assumption that stored bytes are JSON. With Change B in place, the dispatcher
+reads these and the broker dispatch is well-formed. The change is to:
+
+- Replace the zero-value `// hardcoded` framing with a brief comment that
+ states the assumption: the durable column holds JSON because the
+ configured serializer is JSON; recovery cannot recover what was not
+ recorded, so it carries that assumption forward.
+
+That's it — no SQL or schema change.
+
+### Files modified
+
+| File | Change |
+|-------------------------------------------------------|---------------------------------------------------------------------------------------------------------|
+| `handlers/harness/contracts.go` | Extend internal `serializer` interface with `ContentType() string`. |
+| `handlers/harness/02_serialization.go` | On success, set `message.ContentType = this.serializer.ContentType()`. |
+| `handlers/harness/02_serialization_test.go` | Update fixture serializer to satisfy `ContentType() string`; assert ContentType propagates. |
+| `handlers/harness/config.go` | `nop.ContentType() string { return "" }`. |
+| `handlers/harness/sqladapter/dispatcher.go` | Pass `Payload`/`MessageType`/`ContentType`/`Topic` from `*harness.Message`; remove TODO. |
+| `handlers/harness/sqladapter/dispatcher_test.go` | Assert published Dispatch carries Payload/MessageType/ContentType/Topic; tighten stub connector. |
+| `handlers/harness/sqladapter/recovery.go` | Comment-only: state the JSON-content-type assumption inline. |
+| `handlers/harness/sqladapter/recovery_test.go` | Add a regression test asserting the recovered Dispatch carries `Topic`/`MessageType`/`Payload`/`ContentType`. |
+
+No schema files (`doc/mysql/schema.sql`, `testdb_test.go`) are touched.
+
+### Alternatives considered
+
+- **Drop the harness Serialization stage; let the connector serialize.** The
+ other arm of the TODO. Rejected: the persistence stage would have to call
+ the connector's encoder before INSERT (otherwise we lose the
+ content-type-at-store-time invariant), which entangles the harness with the
+ serialization-connector and changes the order of failure (a connector
+ encoding bug would now poison persistence rather than just dispatch).
+ Pass-through is the smaller change.
+
+- **Add a `content_type` column to `Messages` so recovery is faithful to a
+ serializer change.** Rejected: the table holds 50M+ rows, the project-
+ standing assumption is JSON, and the operational cost of an `ALTER TABLE` on
+ that table outweighs the value of recovering exotic content types we don't
+ use. The assumption is documented explicitly in code instead.
+
+- **Have the dispatcher consult the connector's `topicFromMessageType` setting
+ to decide whether to populate Topic.** Rejected: the sqladapter dispatcher
+ has no handle to that config, the configuration is owned by a different
+ package, and the dispatcher's contract is "publish what was persisted." For
+ this pipeline, Topic = Type is the rule; encoding it explicitly in the
+ dispatcher makes the rule visible at the call site instead of hidden in
+ another package's option flag.
+
+## Trade-offs & Risks
+
+- **Recovery carries a JSON assumption, not a recorded fact.** With no
+ `content_type` column on `Messages`, recovery cannot know what serializer
+ was in effect when a row was written; it assumes `application/json`. If
+ the configured serializer is ever changed to something else, in-flight
+ rows persisted under the old serializer will be re-dispatched on next
+ startup tagged `application/json` regardless of what they actually contain.
+ Mitigation: drain the queue (no `dispatched IS NULL` rows) before swapping
+ serializers. Acceptable risk because the project has historically used JSON
+ and has no current plan to change.
+
+- **Topic = MessageType is now hardcoded in the sqladapter Dispatcher.** This
+ matches the connector's default behavior for the existing deployments
+ (`topicFromMessageType=true`) but it removes the option flag's reach for the
+ pre-encoded path. If a future deployment needs `Topic != MessageType`, the
+ rule has to be wired explicitly into the dispatcher (a function, a map, or
+ a new field on `harness.Message`). Worth flagging in a code comment so a
+ future reader understands why the topic rule was duplicated here.
+
+- **The connector encoder's `len(Payload) > 0` early-return is now load-bearing
+ for correctness, not just optimization.** The contract becomes: "if Payload
+ is set, leave the dispatch alone; the caller has populated everything that
+ the broker needs." That's already its behavior
+ (`dispatch_encoder.go:31`), and a short test in
+ `serialization/dispatch_encoder_test.go` already asserts this (`TestWhen-
+ DispatchAlreadyContainsSerializedPayload_Nop`); we'll cross-reference it in
+ a comment but not duplicate it.
+
+- **The dispatcher unit test that previously asserted `published[0].Message`
+ must change** — the stub connector observes `Message` today, but after the
+ change the dispatcher passes `Payload` and leaves `Message` nil. The test
+ becomes: assert `Payload` is the persisted bytes, `MessageType` is the
+ registered name, `ContentType` is what the harness recorded, `Topic` equals
+ MessageType, `Durable` is true.
+
+- **Coverage and shape of behavior tests are otherwise unchanged.** No new
+ goroutine semantics, no concurrency-shape changes; this is a pure data-flow
+ refactor — strictly in-process, no schema migration.
+
+## Implementation Checklist
+
+### Phase 1: ContentType on the internal serializer interface (red → green)
+
+- [ ] Extend `02_serialization_test.go` fixture: add `ContentType() string { return "test/content-type" }` and a new test `TestSerializesEachResultValueIntoContent_PopulatesContentTypeOnSuccess` asserting `units[0].results[0].ContentType == "test/content-type"`.
+- [ ] Run tests, confirm failure (assertion fails: success path leaves `ContentType` empty).
+- [ ] Extend `serializer` interface in `handlers/harness/contracts.go` to include `ContentType() string`.
+- [ ] Add `ContentType() string { return "" }` to `nop` in `config.go` so the default still satisfies the interface.
+- [ ] In `02_serialization.go`, on the success branch, set `message.ContentType = this.serializer.ContentType()`. Leave the fallback branch's existing `"go fmt.Sprintf(%#v)"` assignment alone.
+- [ ] Run tests, confirm green. Verify the fallback test (`TestSerializerErrorIsTracked_FallbackToFmtSprintfEncoding`) still asserts the `"go fmt.Sprintf(%#v)"` ContentType.
+
+### Phase 2: Dispatcher pass-through (red → green)
+
+- [ ] Add `TestDispatch_PublishesPreEncodedPayloadAndMetadata` to `dispatcher_test.go`: extend the `seedMessage` helper (or inline its setup) so the `*harness.Message` carries `Type`, `ContentType` (e.g. `"application/json"`), and a pre-encoded `Content` buffer. Assert the recorded `messaging.Dispatch` has `Payload == message.Content.Bytes()`, `MessageType == message.Type`, `ContentType == message.ContentType`, `Topic == message.Type`, `Durable == true`, and `Message == nil`.
+- [ ] Run tests, confirm failure (today the dispatcher sends `Message: message.Value` and leaves Payload/MessageType/ContentType/Topic blank).
+- [ ] Edit `sqladapter/dispatcher.go:55-71`:
+ - Drop the TODO comment block.
+ - Change the `messaging.Dispatch` literal to `{Durable: true, MessageType: message.Type, ContentType: message.ContentType, Payload: message.Content.Bytes(), Topic: message.Type}`.
+ - Add a one-line comment explaining that Topic = Type because the connector encoder is short-circuited by the pre-populated Payload (cross-reference `serialization/dispatch_encoder_test.go:TestWhenDispatchAlreadyContainsSerializedPayload_Nop`).
+- [ ] Run tests, confirm green. Update `TestDispatch_PublishesAndMarksDispatched` so its `published[0].Message` assertion is replaced with assertions on Payload/MessageType/ContentType/Topic.
+- [ ] Confirm `TestDispatch_PublishFails_ReturnsErrorWithoutMarkingDispatched` and `TestDispatch_NoMessages_NoOp` still pass.
+
+### Phase 3: Recovery regression test + comment (red → green)
+
+- [ ] Add `TestRecover_PublishesDispatchWithTopicMessageTypeAndPayload`: seed an undispatched row with `type='order-received'` and a JSON payload; run `Recover`; assert the resulting `messaging.Dispatch` has `Topic == "order-received"`, `MessageType == "order-received"`, `ContentType == "application/json"`, `Payload == `, `Durable == true`, and `Message == nil`.
+- [ ] Run the test. With Phase 2 in place, this test should already pass (recovery's `*harness.Message` is well-formed and the new dispatcher reads it correctly). If it fails, that is the right reason to fix it before continuing.
+- [ ] Edit `sqladapter/recovery.go:42-47`: replace the inline framing of `ContentType: "application/json"` with a comment that states the assumption — the project's serializer is JSON; the durable bytes are recorded without a content-type column, so recovery carries that assumption forward.
+- [ ] Run tests, confirm green. Confirm `TestRecover_NoOrphans_NoOp`, `TestRecover_DispatchesUndispatchedRowsInIDOrder`, `TestRecover_PassesPayloadAndTypeIntoMessage`, `TestRecover_RowsExceedBatchSize_FlushesInBatchesAndDispatchesAll`, and `TestRecover_RowCountIsMultipleOfBatchSize_FlushesUniformBatches` all still pass.
+
+### Phase 4: Full verification
+
+- [ ] Run `make test` (fmt, vet, `-race`, coverage). Confirm green and that `handlers/harness` and `handlers/harness/sqladapter` coverage have not regressed.
+- [ ] Re-read the diff against the `CLAUDE.md` Go conventions: receiver named `this`; named slice/return values where applicable; no naked returns; no blank lines at method start/end; struct initializers use field/value pairs; multi-line struct literals close the brace on their own line.
+- [ ] Sanity-check the diffs against the three defects in Background: (a) no double encode (dispatcher passes pre-encoded bytes; connector encoder short-circuits on `len(Payload) > 0`), (b) ContentType propagates in-process from serialization → message → dispatch, (c) recovery's `*harness.Message` now flows through the dispatcher to a well-formed broker dispatch (no more `ErrEmptyDispatchTopic`).
+- [ ] Confirm package doc comment in `handlers/harness/sqladapter/dispatcher.go` still accurately describes the columns it depends on (`id`, `dispatched`, `type`, `payload`); it has not changed.
diff --git a/doc/work-sessions/2026/2026-06-02_12-40-29-proposal-dispatch-content-encoding.html b/doc/work-sessions/2026/2026-06-02_12-40-29-proposal-dispatch-content-encoding.html
new file mode 100644
index 0000000..935e3bc
--- /dev/null
+++ b/doc/work-sessions/2026/2026-06-02_12-40-29-proposal-dispatch-content-encoding.html
@@ -0,0 +1,334 @@
+
+
+
+
+Proposal: Stop double-encoding payloads across dispatch and recovery
+
+
+
+
Proposal: Stop double-encoding payloads across dispatch and recovery
+
+
Background
+
+ The harness pipeline is store-and-forward: each message is serialized once
+ (stage 02), durably stored (stage 03), then published to the broker (stage 05
+ via Dispatcher.Dispatch). On startup, sqladapter.Recover
+ re-publishes any row whose dispatched column is still NULL.
+
+
+ The bytes stored in the Messages.payload column must
+ be byte-identical to the bytes published to the broker — otherwise a recovery
+ re-publishes a different message than the original send, silently. Today the
+ code does not guarantee this, and in one configuration it cannot publish a
+ recovered row at all.
+
+
+ There are three connected defects across handlers/harness/02_serialization.go,
+ handlers/harness/sqladapter/dispatcher.go, and
+ handlers/harness/sqladapter/recovery.go. They are usually invisible
+ (the production serializer is JSON, which makes the bug benign) and they are masked
+ by the unit-test stub connector — but they are real, and recovery against a real
+ RabbitMQ writer is broken today.
+
+
+
Defect 1: payload is encoded twice on the happy path
+
+ 02_serialization.go:29 encodes message.Value into
+ message.Content (these are the bytes destined for the durable row).
+
+
+ sqladapter/dispatcher.go:64-67 then builds a messaging.Dispatch
+ with Message: message.Value (the in-memory Go struct) and passes it to the
+ transport Writer. The transport writer is the serialization.defaultWriter
+ (serialization/connector.go:103), which loops calls into
+ defaultDispatchEncoder.Encode (serialization/dispatch_encoder.go:30).
+ That encoder sees dispatch.Payload is empty and dispatch.Message
+ is non-nil, and runs the same Serializer.Serialize again
+ to produce dispatch.Payload/dispatch.ContentType/dispatch.MessageType.
+
+
+ So the value is serialized once for the DB, then again for the broker. Both encodings
+ happen with the same Serializer, so the resulting bytes are equal,
+ but the harness has no guarantee of that — anyone who plugs in a serializer with side
+ effects, IDs, or timestamps will get divergent stored vs. published bytes. And it is
+ wasted CPU on every send.
+
+
+ The TODO at dispatcher.go:58-63 calls this out and asks for one of two fixes:
+ either pass the pre-encoded bytes through Dispatch.Payload/MessageType/ContentType
+ and skip the connector's serialization for this writer, or drop the harness Serialization
+ stage and let the connector own all encoding.
+
+
+
Defect 2: Message.ContentType is never populated on the happy path
+
+ harness.Message already has ContentType and ContentEncoding
+ fields (message.go:22-28) — but 02_serialization.go only writes
+ message.ContentType on the fallback path (when the user's
+ serializer returned an error and we fall back to fmt.Sprintf("%#v")).
+ On the success path message.ContentType is left as the zero value "".
+
+
+ This is fine today because dispatcher.go re-encodes via the connector
+ (Defect 1), so the connector reapplies its own ContentType(). The moment we
+ fix Defect 1 by passing pre-encoded bytes through, we must know what the content type is —
+ and the harness never recorded it. The fix: stage 02 stamps message.ContentType
+ from serializer.ContentType() so the dispatcher (next stage in the same process)
+ can read it. No schema change.
+
+
+
Defect 3: recovery hands the dispatcher a malformed message and breaks against a real broker
+
+ recovery.go:42-47 builds a harness.Message from the row with
+ Value left nil. The dispatcher today (per Defect 1) passes
+ Message: message.Value to the connector — which means recovery passes
+ Message: nil and Payload: nil. The connector's encoder takes its
+ dispatch.Message == nil early-return (dispatch_encoder.go:31) and
+ never sets Topic / MessageType / ContentType.
+ The RabbitMQ writer then rejects with ErrEmptyDispatchTopic
+ (rabbitmq/writer.go:38). So in the only production configuration that uses
+ topicFromMessageType=true, recovery can't publish a single row.
+
+
+ Defect 3 is fixed automatically once Defect 1 is fixed: the new dispatcher
+ reads Type / ContentType / Content (which recovery
+ already populates) from the *harness.Message and hands them to the connector as
+ pre-populated MessageType / ContentType / Payload /
+ Topic. The connector encoder's len(Payload) > 0 early-return
+ then leaves everything alone, and the broker gets a complete dispatch.
+
+
+
Approach
+
+ We pick the "pass-through" arm of the TODO: the harness Serialization stage remains the
+ single source of truth for encoded bytes and content type; the sqladapter Dispatcher hands
+ those pre-encoded bytes to the connector via Dispatch.Payload /
+ MessageType / ContentType; the connector encoder short-circuits
+ because Payload is non-empty. Recovery does the same pass-through — using the
+ row's stored type and the project-wide assumption that stored bytes are
+ application/json.
+
+
+ The alternative (delete the harness Serialization stage entirely and let the connector own
+ all encoding) would require persistence to also call the connector's encoder and would
+ entangle the harness with the connector. Rejected on those grounds.
+
+
+
Change A — 02_serialization.go: record content type on success
+
+ Extend the internal serializer interface in contracts.go:34:
+
+ The nop serializer in config.go:112 gains
+ ContentType() string { return "" }. The test fixture in
+ 02_serialization_test.go:40 gains the same stub. The production wiring
+ (callers pass serialization.Serializer from the serialization
+ package, which already exposes ContentType() string at
+ contracts.go:9-11) needs no changes.
+
+
+
Change B — sqladapter/dispatcher.go: pass pre-encoded fields
+
+ Replace the current messaging.Dispatch literal (which passes Message: message.Value)
+ with:
+
+ Topic: message.Type is required because the connector encoder normally sets
+ Topic = MessageType only when its topicFromMessageType flag is on
+ (dispatch_encoder.go:55). With Payload already populated, the
+ encoder takes its len(dispatch.Payload) > 0 early-return
+ (dispatch_encoder.go:31) and never runs the topic-population block. So the
+ dispatcher must populate Topic itself. A one-line comment cross-references
+ serialization/dispatch_encoder_test.go:TestWhenDispatchAlreadyContainsSerializedPayload_Nop
+ to explain why.
+
+
+
Change C — sqladapter/recovery.go: comment-only
+
+ No functional change. Replace the inline framing of ContentType: "application/json"
+ with a comment that states the assumption: the project's serializer is JSON; the durable
+ bytes are recorded without a content-type column, so recovery carries that assumption
+ forward.
+
+
+
Files modified
+
+
+
+
File
+
Change
+
+
+
+
handlers/harness/contracts.go
Extend internal serializer interface with ContentType() string.
+
handlers/harness/02_serialization.go
On success, set message.ContentType = this.serializer.ContentType().
+
handlers/harness/02_serialization_test.go
Update fixture serializer to satisfy ContentType() string; assert ContentType propagates.
+
handlers/harness/config.go
nop.ContentType() string { return "" }.
+
handlers/harness/sqladapter/dispatcher.go
Pass Payload/MessageType/ContentType/Topic from *harness.Message; remove TODO.
+
handlers/harness/sqladapter/dispatcher_test.go
Assert published Dispatch carries Payload/MessageType/ContentType/Topic; tighten stub connector.
+
handlers/harness/sqladapter/recovery.go
Comment-only: state the JSON content-type assumption inline.
+
handlers/harness/sqladapter/recovery_test.go
Add regression test asserting recovered Dispatch carries Topic/MessageType/Payload/ContentType.
+
+
+
No schema files (doc/mysql/schema.sql, testdb_test.go) are touched.
+
+
Trade-offs & Risks
+
+
+ Recovery carries a JSON assumption, not a recorded fact.
+ With no content_type column on Messages, recovery cannot know
+ what serializer was in effect when a row was written; it assumes
+ application/json. If the configured serializer is ever changed, in-flight
+ rows persisted under the old serializer will be re-dispatched tagged
+ application/json regardless of what they actually contain.
+ Mitigation: drain the queue before swapping serializers. Acceptable risk because the
+ project has historically used JSON and has no current plan to change.
+
+
+ Topic = MessageType is now hardcoded in the sqladapter Dispatcher.
+ This matches the connector's default behavior for existing deployments
+ (topicFromMessageType=true) but removes the option flag's reach for the
+ pre-encoded path. If a future deployment needs Topic != MessageType, the
+ rule has to be wired explicitly into the dispatcher.
+
+
+ The connector encoder's len(Payload) > 0 early-return is now
+ load-bearing for correctness, not just optimization. The contract becomes:
+ "if Payload is set, leave the dispatch alone; the caller has populated everything the
+ broker needs." That's already its behavior (dispatch_encoder.go:31), and
+ TestWhenDispatchAlreadyContainsSerializedPayload_Nop already asserts it;
+ we'll cross-reference it in a comment but not duplicate it.
+
+
+ The dispatcher unit test that previously asserted published[0].Message
+ must change — after the change the dispatcher passes Payload and
+ leaves Message nil. The test becomes: assert Payload is the
+ persisted bytes, MessageType is the registered name, ContentType
+ is what the harness recorded, Topic equals MessageType,
+ Durable is true.
+
+
+ Coverage and shape of behavior tests are otherwise unchanged.
+ No new goroutine semantics, no concurrency-shape changes; this is a pure data-flow
+ refactor — strictly in-process, no schema migration.
+
+
+
+
Your Turn
+
+ The step marked 🤝 in the checklist is reserved for you to implement.
+ It was chosen because it is the most interesting production-code contribution
+ in this proposal — the kind of work worth doing yourself.
+
+
What to implement
+
+ In handlers/harness/sqladapter/dispatcher.go, replace the
+ messaging.Dispatch literal that currently passes
+ Message: message.Value with one that passes the pre-encoded fields.
+ Drop the TODO comment block. The new literal is:
+
+ Add a one-line comment above the Topic field (or the append call) explaining
+ that Topic = Type because the connector encoder is bypassed when
+ Payload is pre-populated — cross-reference
+ serialization/dispatch_encoder_test.go:TestWhenDispatchAlreadyContainsSerializedPayload_Nop.
+
+
Context and interfaces
+
+ The relevant area is handlers/harness/sqladapter/dispatcher.go:55-71.
+ message is a *harness.Message; by the time Change A is
+ complete, message.ContentType will be the string returned by the
+ serializer's ContentType() method, and message.Content is the
+ *bytes.Buffer holding the encoded bytes.
+ messaging.Dispatch is defined in the messaging package —
+ Payload []byte, MessageType string,
+ ContentType string, Topic string, Durable bool,
+ Message any. Leave Message unset (nil).
+
+
What success looks like
+
+ TestDispatch_PublishesPreEncodedPayloadAndMetadata in
+ handlers/harness/sqladapter/dispatcher_test.go passes. It will assert:
+
Phase 1: ContentType on the internal serializer interface (red → green)
+
+
Extend 02_serialization_test.go fixture: add ContentType() string { return "test/content-type" } and a new test TestSerializesEachResultValueIntoContent_PopulatesContentTypeOnSuccess asserting units[0].results[0].ContentType == "test/content-type".
Extend serializer interface in handlers/harness/contracts.go to include ContentType() string.
+
Add ContentType() string { return "" } to nop in config.go so the default still satisfies the interface.
+
In 02_serialization.go, on the success branch, set message.ContentType = this.serializer.ContentType(). Leave the fallback branch's existing "go fmt.Sprintf(%#v)" assignment alone.
+
Run tests, confirm green. Verify the fallback test (TestSerializerErrorIsTracked_FallbackToFmtSprintfEncoding) still asserts the "go fmt.Sprintf(%#v)" ContentType.
+
+
+
Phase 2: Dispatcher pass-through (red → green)
+
+
Add TestDispatch_PublishesPreEncodedPayloadAndMetadata to dispatcher_test.go: extend the seedMessage helper (or inline setup) so the *harness.Message carries Type, ContentType (e.g. "application/json"), and a pre-encoded Content buffer. Assert the recorded messaging.Dispatch has Payload == message.Content.Bytes(), MessageType == message.Type, ContentType == message.ContentType, Topic == message.Type, Durable == true, and Message == nil.
+
Run tests, confirm failure (today the dispatcher sends Message: message.Value and leaves Payload/MessageType/ContentType/Topic blank).
+
🤝 Edit sqladapter/dispatcher.go:55-71: drop the TODO comment block; change the messaging.Dispatch literal to pass Durable, MessageType, ContentType, Payload, and Topic from the *harness.Message; add a one-line comment explaining Topic = Type — see Your Turn for the full spec.
+
Run tests, confirm green. Update TestDispatch_PublishesAndMarksDispatched so its published[0].Message assertion is replaced with assertions on Payload/MessageType/ContentType/Topic.
+
Confirm TestDispatch_PublishFails_ReturnsErrorWithoutMarkingDispatched and TestDispatch_NoMessages_NoOp still pass.
+
+
+
Phase 3: Recovery regression test + comment (red → green)
+
+
Add TestRecover_PublishesDispatchWithTopicMessageTypeAndPayload: seed an undispatched row with type='order-received' and a JSON payload; run Recover; assert the resulting messaging.Dispatch has Topic == "order-received", MessageType == "order-received", ContentType == "application/json", Payload == <the row's payload bytes>, Durable == true, and Message == nil.
+
Run the test. With Phase 2 in place, this test should already pass (recovery's *harness.Message is well-formed and the new dispatcher reads it correctly). If it fails, that is the right reason to fix it before continuing.
+
Edit sqladapter/recovery.go:42-47: replace the inline framing of ContentType: "application/json" with a comment stating the assumption — the project's serializer is JSON; the durable bytes are recorded without a content-type column, so recovery carries that assumption forward.
+
Run tests, confirm green. Confirm TestRecover_NoOrphans_NoOp, TestRecover_DispatchesUndispatchedRowsInIDOrder, TestRecover_PassesPayloadAndTypeIntoMessage, TestRecover_RowsExceedBatchSize_FlushesInBatchesAndDispatchesAll, and TestRecover_RowCountIsMultipleOfBatchSize_FlushesUniformBatches all still pass.
+
+
+
Phase 4: Full verification
+
+
Run make test (fmt, vet, -race, coverage). Confirm green and that handlers/harness and handlers/harness/sqladapter coverage have not regressed.
+
Re-read the diff against the CLAUDE.md Go conventions: receiver named this; named slice/return values where applicable; no naked returns; no blank lines at method start/end; struct initializers use field/value pairs; multi-line struct literals close the brace on their own line.
+
Sanity-check the diffs against the three defects: (a) no double encode — dispatcher passes pre-encoded bytes; connector encoder short-circuits on len(Payload) > 0; (b) ContentType propagates in-process from serialization → message → dispatch; (c) recovery's *harness.Message now flows through the dispatcher to a well-formed broker dispatch (no more ErrEmptyDispatchTopic).
+
Confirm package doc comment in handlers/harness/sqladapter/dispatcher.go still accurately describes the columns it depends on (id, dispatched, type, payload); it has not changed.
+ This branch introduces a new handlers/harness package: a staged, store-and-forward
+ message-handling pipeline composed of seven goroutine stages connected by buffered channels
+ (entrypoint → execution → serialization → persistence → completion → broadcast → terminal).
+ It also adds handlers/harness/sqladapter, a reference MySQL implementation of the
+ Writer and Dispatcher interfaces, backed by a new Messages
+ schema in doc/mysql/schema.sql.
+ Supporting infrastructure includes a Docker Compose file for local MySQL testing, new
+ Makefile targets for database-backed integration tests, and updated go.mod/
+ go.sum adding github.com/go-sql-driver/mysql and
+ github.com/smarty/gunit/v2.
+ All 30 commits are additions only — no existing code was modified.
+ Test coverage is extensive: every pipeline stage, the router/scanner, the admission adapter,
+ the SQL writer, dispatcher, and recovery function all have unit or integration tests.
+
+
+
Changed Files
+
+
Makefile — adds test.db and test.db.local targets for running integration tests with and without a local Docker MySQL instance.
+
doc/docker-compose.yml — MySQL 8.0 service definition with a health check used by test.db.local.
+
doc/mysql/schema.sql (renamed from sqlmq/_schema_mysql.sql) — Messages table schema plus a unique index on (dispatched, id).
+
go.mod / go.sum — adds github.com/go-sql-driver/mysql v1.10.0, github.com/smarty/gunit/v2 v2.1.0, and their transitive dependency filippo.io/edwards25519.
+
handlers/harness/contracts.go — exported interfaces (Writer, Dispatcher, Monitor), unexported internal interfaces, all monitor observation types, sentinel errors, and shared value types (batch, unitOfWork).
handlers/harness/config.go — package-level doc comment; New(ctx, options...) constructor; Options singleton with typed option setters; nop zero-value implementation of all collaborator interfaces.
+
handlers/harness/pipeline.go — build(ctx, cfg) wires all stages and returns the handler+listeners.
+
handlers/harness/00_entrypoint.go — entrypoint: implements messaging.Handler and io.Closer; exposes Handle (blocks until durable), await (context-cancellable), admit (load-shedding check), and Listen (blocks until closed).
+
handlers/harness/01_execution.go — execution: reads batches, calls executor per message, flushes to unitOfWork at max size.
+
handlers/harness/02_serialization.go — serialization: serializes each result value; on error falls back to fmt.Sprintf("%#v") and tracks SerializationError.
+
handlers/harness/03_persistence.go — persistence: writes results to Writer, retries on failure; drops unit if context is cancelled before success.
+
handlers/harness/04_completion.go — completion: fires all completion callbacks (acknowledging upstream batches) before forwarding.
+
handlers/harness/05_broadcast.go — broadcast: dispatches results to Dispatcher, retries on failure; forwards unit even if abandoned (context cancelled).
+
handlers/harness/06_terminal.go — terminal: drains the final channel to allow upstream close propagation.
+
handlers/harness/admission.go — Admission HTTP middleware wrapping the load-shedding check; AsHTTPHandler adapter bridging awaiter to messaging.Handler.
+
handlers/harness/routing.go + scanner.go — router and scan: reflective discovery of Execute*/Apply* methods on registered domain types; self-applicator exclusion logic.
handlers/harness/pool.go — generic poolT[T] wrapper around sync.Pool.
+
handlers/harness/retry.go — wait(ctx, d) helper: sleeps for d or returns early on context cancellation.
+
handlers/harness/*_test.go — unit tests for every stage, the router, the pipeline integration, config defaults, and the admission middleware.
+
handlers/harness/sqladapter/writer.go — Writer: transactional INSERT into Messages, populates Message.ID from auto-increment; supports a legacyWrite escape hatch within the same transaction.
+
handlers/harness/sqladapter/dispatcher.go — Dispatcher: publishes pre-encoded payloads via messaging.Connector, then marks rows dispatched.
+
handlers/harness/sqladapter/recovery.go — Recover: at-startup scan for undispatched rows, re-dispatches in pages.
+
handlers/harness/sqladapter/*_test.go — integration tests for Writer, Dispatcher, and Recover against a real MySQL database.
+
doc/work-sessions/2026/ (proposals and proposals-as-HTML) — five research/proposal documents tracking the design iteration for this feature (not production code).
+
+
+
Concerns
+
+
+ Asymmetric retry semantics between persistence and broadcast.
+ persistence.store() returns false and skips forwarding the unit if the
+ context is cancelled — this means the batch is not acknowledged and the MQ will redeliver.
+ broadcast.dispatch() on the other hand always forwards the unit even when abandoned
+ (BroadcastAbandoned), meaning the batch is acknowledged via
+ completion even though dispatch was skipped. The comment in 03_persistence.go
+ explicitly notes "do NOT forward (no ack); MQ redelivers", but there is no equivalent comment in
+ 05_broadcast.go explaining the intentional difference. If this asymmetry is deliberate
+ (a "at-least-once write, best-effort dispatch" guarantee), it should be documented prominently
+ before merge — it is a subtle and consequential contract that future maintainers will need to
+ understand.
+
+
+ Unchecked waiter.Put after Handle returns.
+ In entrypoint.Handle (00_entrypoint.go:70), defer this.waiters.Put(waiter)
+ recycles the waiter immediately after waiter.Wait() returns. This is correct because
+ Handle does not honor context cancellation and always waits for complete().
+ However the method comment does not state this invariant. The contrast with await's
+ careful non-put-on-departure logic (well-commented) makes Handle's unconditional defer
+ look like a potential oversight to a reader. A brief comment on Handle explaining
+ "Handle ignores context cancellation and always waits — safe to defer Put here" would close this
+ gap.
+
+
+ sqladapter.Recover has a hardcoded ContentType.
+ In sqladapter/recovery.go:46, recovered messages are stamped with
+ ContentType: "application/json" unconditionally, with a comment noting
+ "no content_type column exists to record otherwise". If the schema ever gains a
+ content_type column, this default will silently misdeclare non-JSON payloads.
+ The TODO: make this a Listener and TODO: retry w/ backoff comments
+ confirm this code is not intended to be its final form; however, the hardcoded content-type is
+ a data-correctness issue that should be tracked explicitly (e.g., a TODO or a schema change),
+ not just left as a silent assumption.
+
+
+ Stride-based ID assignment in sqladapter.Writer relies on fragile MySQL behavior.
+ sqladapter/writer.go:84–89 assigns Message.ID values as
+ firstID + i * stride, citing the MySQL docs that LAST_INSERT_ID()
+ returns the first auto-increment value of a multi-row insert. This is only reliable when
+ innodb_autoinc_lock_mode = 1 (the default in MySQL 5.x) or when
+ auto_increment_increment = stride. Under MySQL 8.x with
+ innodb_autoinc_lock_mode = 2 (the new default), IDs for a bulk insert are
+ guaranteed to be allocated in sequence, but this is internal behavior — the MySQL manual
+ explicitly warns that applications should not depend on consecutive IDs from a single INSERT
+ when in interleaved lock mode. The existing comment and the stride feature are clearly
+ deliberate; adding a comment documenting the MySQL lock-mode requirement would prevent a
+ future breakage that would be difficult to diagnose.
+
+
+ No upper bound on retry loops in persistence and broadcast.
+ Both persistence.store() and broadcast.dispatch() loop forever until
+ success or context cancellation. The commit message for 574396f ("Bound the
+ unbounded retry loops") and the inline TODO: exponential back-off w/ jitter
+ comments confirm this is a known gap. This is an acknowledged TODO, but it is worth flagging
+ in review since an unresponsive Writer or Dispatcher with a live context will hold a pipeline
+ stage open indefinitely, blocking all downstream acknowledgements.
+
+
+ fanIn.Listen closes the output channel before all goroutines have drained.
+ In fanout.go, fanIn.Listen has:
+
defer close(this.output)
+defer wg.Wait()
+ Deferred calls execute in LIFO order, so wg.Wait() runs before
+ close(this.output) — which is the correct and intended behavior. However, the
+ ordering is non-obvious and a future edit (e.g., swapping the defer lines) would silently
+ introduce a race where the output channel is closed before all goroutines have finished
+ forwarding to it. Consider merging into a single explicit sequence at the end of the function
+ body, or adding a comment documenting the required defer order.
+
+
+ Docker Compose service creates a database named accounting, but tests use messaging_harness_test.
+ doc/docker-compose.yml sets MYSQL_DATABASE: accounting.
+ testdb_test.go ignores this and creates/drops its own messaging_harness_test
+ schema. The accounting database is never used and its presence may confuse readers.
+ It is probably a copy-paste artifact and should either be removed or renamed to something neutral
+ (e.g., MYSQL_DATABASE: harness_bootstrap).
+
+
+
+
diff --git a/doc/work-sessions/2026/2026-06-09_14-22-40-proposal-serialization-fail-fast.html b/doc/work-sessions/2026/2026-06-09_14-22-40-proposal-serialization-fail-fast.html
new file mode 100644
index 0000000..b195522
--- /dev/null
+++ b/doc/work-sessions/2026/2026-06-09_14-22-40-proposal-serialization-fail-fast.html
@@ -0,0 +1,216 @@
+
+
+
+
+Proposal: Fail fast on serialization errors
+
+
+
+
Proposal: Fail fast on serialization errors
+
+
Background
+
+ Commit 2b7c630 established the contract: it is the domain model's
+ responsibility to only produce values that serialize successfully. A
+ serialization failure is therefore a programming error — a contract violation
+ that should never occur in a correctly built application. But the pipeline must
+ still do something when it occurs, and what it does today is the worst
+ available option.
+
+
+ Today, handlers/harness/02_serialization.go:30-38 tracks a
+ SerializationError via the monitor and then lets the unit of work
+ keep flowing. The failed message proceeds with an empty Content
+ buffer and an empty ContentType, which means:
+
+
+
Stage 03 persists a Messages row with a type but an empty payload — a corrupt assertion that "this event occurred and here is its content."
+
Stage 04 acks the batch upstream, so the input is never redelivered.
+
Stage 05 publishes the empty payload to every downstream subscriber of that message type — the cascading-failure scenario.
+
If the row somehow escapes dispatch, sqladapter.Recover faithfully re-publishes the empty payload at every subsequent startup.
+
+
+
Decision (from design discussion, 2026-06-09)
+
+
Question
Answer
+
Persist a typed, payload-less row?
No
+
Dispatch the message downstream?
No — hard constraint
+
Mark a persisted row dispatched?
Moot (nothing is persisted)
+
+
+ Instead: track the observation, then halt the process before the
+ unit reaches persistence.
+
+
+
Approach
+
+ In 02_serialization.go, when serializer.Serialize returns
+ an error: track the SerializationError exactly as today, then
+ panic with the wrapped error (which satisfies
+ errors.Is(err, ErrSerialization)). The unit is never forwarded.
+
+
+ This composes cleanly with semantics the pipeline already has:
+
+
+
Nothing is persisted, dispatched, or acked — the same
+ "no ack → MQ redelivers" contract the persistence-abandonment path relies on
+ (03_persistence.go:40).
+
The failure is deterministic: on restart the MQ redelivers
+ the input, the domain model produces the same unserializable value, and the app
+ crash-loops loudly until a human fixes the model. Anomaly visibility is maximal.
+
Downstream systems experience only the publisher's absence,
+ never a malformed message.
+
No schema changes; no special cases in Writer,
+ Dispatcher, or Recover.
+
+
+ Why the whole process and not just the unit: by the time
+ serialization runs, the domain model's in-memory state has already advanced
+ (execution happened in stage 01). Dropping the unit and continuing would let MQ
+ redelivery replay inputs against already-mutated state. The only safe recovery
+ point is process death followed by a clean rebuild of state — halt or forward,
+ nothing in between.
+
+
+ Mechanically, the panic occurs inside one of the fanned-out serialization
+ goroutines (SerializerCount, default 4). An unrecovered panic in any
+ goroutine terminates the process with a stack trace on stderr — loud by design.
+ The existing defer close(this.output) still runs during unwind, so
+ test code (which recovers) observes a closed output channel.
+
+
+
Sketch (not final code)
+
err := this.serializer.Serialize(message.Content, message.Value)
+if err != nil {
+ failure := SerializationError{
+ Error: fmt.Errorf("%w: %w", ErrSerialization, err),
+ Value: message.Value,
+ }
+ this.monitor.Track(failure)
+ panic(failure.Error) // contract violation (see commit 2b7c630): halt before
+ // persistence so nothing is stored, acked, or dispatched.
+}
+message.ContentType = this.serializer.ContentType()
+
+
+ Note this also retires the awkward failure.Error = nil / failure.Value = nil
+ reset dance in the current loop — failure becomes a local inside the
+ error branch.
+
+
+
Trade-offs
+
+
Innocent bystanders. Other in-flight units
+ (and sibling messages within the failing unit) die unacked and are redelivered
+ after restart. At-least-once delivery already demands tolerance of this; it is
+ the same blast radius as a power failure.
+
Monitor flush.Track runs
+ synchronously before the panic, but if a monitor implementation buffers
+ asynchronously, the observation may be lost when the process dies. The panic
+ message and stack trace on stderr carry the same information, so the anomaly is
+ never silent.
+
Deliberate outage. The app crash-loops until
+ the domain bug is fixed and deployed. This is the accepted cost: stream
+ integrity outranks availability for a should-never-happen contract violation.
+
No escape hatch (yet). A configurable
+ Options.Fatal(func(error)) hook could replace the bare panic later
+ if an application ever needs graceful-drain-then-exit. Deliberately omitted now:
+ no known consumer needs it, and it would invite "handling" the unhandleable.
+
+
+
Implementation Checklist
+
+
Phase 1 — Red: capture fail-fast semantics in tests (agent)
+
+
☑ In 02_serialization_test.go, rewrite
+ TestSerializerErrorIsTracked as
+ TestSerializerErrorTracksThenPanics: arrange a unit of
+ [good, bad, good] messages with the bad one registered in
+ serializeFail; invoke Listen()synchronously
+ wrapped in a recover-capturing closure (input pre-loaded and closed).
+
☑ Assert: the recovered value is an error satisfying
+ errors.Is(err, ErrSerialization) and wrapping the underlying
+ serializer error.
+
☑ Assert: exactly one SerializationError was tracked
+ (with the offending Value) before the panic.
+
☑ Assert: serialization stopped at the bad message
+ (serializeCalls contains the first good value and the bad value, not
+ the trailing good value).
+
☑ Assert: the failing unit was never forwarded and the output channel
+ was closed (drain yields zero units).
+
☑ Add TestUnitsBeforeFailureRemainForwarded: a fully good
+ unit followed by a failing unit — drain yields only the good unit; panic still
+ observed.
+
☑ Run go test ./handlers/harness/ -run TestSerializationFixture | tail -50
+ — confirm the new tests fail for the right reason (no panic occurs; unit is
+ forwarded today).
+
+
+
Phase 2 — Green: production change (your turn 🟠)
+
+
☑ Modify serialization.Listen in
+ 02_serialization.go to make the Phase 1 tests pass.
+
☑ Run go test ./handlers/harness/ -run TestSerializationFixture | tail -50
+ — confirm green.
+
+
+
Phase 3 — Documentation & sweep (agent)
+
+
☑ Update the package doc comment in config.go to state
+ the fail-fast contract: a serialization failure tracks a
+ SerializationError and panics; values supplied to the pipeline must
+ serialize (per commit 2b7c630).
+
☑ Update the doc comment on SerializationError /
+ ErrSerialization in contracts.go to note the
+ observation immediately precedes a panic.
+
☑ Grep for any other code or docs that assume serialization errors
+ flow through (e.g. empty-ContentType handling) and reconcile.
+ (Only match: the unrelated transport-level SerializationError in the
+ serialization package. Nothing to reconcile.)
+
☑ Run make test | tail -50 — full suite green
+ (includes go mod tidy, go fmt, -race;
+ handlers/harness at 100% coverage).
+
+
+
Your Turn 🟠
+
+
+ Phase 2 is yours: the semantic heart of the change, sized at roughly ten lines in
+ 02_serialization.go. The failing tests from Phase 1 are your
+ specification. Guidance, not code:
+
+
+
Keep the loop structure and defer close(this.output) —
+ the deferred close is what lets the recovering test observe a closed output.
+
On error: build the SerializationError as a local,
+ Track it first, then panic with its (wrapped) error so
+ callers of recover() can use errors.Is(..., ErrSerialization).
+
On success: set message.ContentType as today.
+
The failure variable hoisted above the loop (and its
+ nil-reset lines) should disappear.
+
Replace the "big, fat, hairy deal" comment with one stating the
+ contract and the consequence (halt before persistence; nothing stored, acked, or
+ dispatched; MQ redelivers after restart).
+
+
+ If you'd rather I take Phase 2 as well, just say "skip the your-turn step."
+