feat(ee): add DLQ list / reprocess / discard to the Enterprise client#69
Merged
PabloPardoGarcia merged 5 commits intoJun 15, 2026
Conversation
Implements the SDK side of DLQ message processing (ETL-1187) on the Enterprise DLQ: - `ee.DLQ.list(batch_size, cursor)` — non-destructive, paginated read. Messages now carry `message_id` (NATS seq), `source`, and `received_at`. - `reprocess(message_ids)` / `reprocess_all()` — move messages back into the pipeline input (POST /dlq/reprocess, mode=selected|all). - `discard(message_ids)` / `discard_all()` — permanently remove (POST /dlq/discard, mode=selected|all). Deprecate the inherited operations whose endpoints are changing under EE: - `consume()` warns and delegates to `list()` (the /dlq/consume endpoint is being removed in favour of non-destructive /dlq/list), so callers keep working through the transition. - `purge()` warns and points to `discard_all()`; the legacy endpoint still works. message_ids are validated client-side (non-empty, <=1000) to mirror the backend and fail fast. A 403 from an unlicensed backend maps to FeatureNotLicensedError (re-added), which subclasses ForbiddenError so existing 403 handling still catches it. `ee.Pipeline` wires `_dlq_class` to the EE DLQ so `client.get_pipeline(id).dlq` exposes these methods. Contract per the DLQ Message Processing design doc / glassflow-etl-ee API. The backend reprocess/discard handlers and the consume->list rename are still in flight; the SDK is written against the agreed contract and tests mock the HTTP layer. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Reprocess replays messages through the running pipeline, so the API returns 409 Conflict when the pipeline is not in the Running state. Map it to a clear, catchable error instead of a generic APIError: - Add ConflictError (409) to the base client error mapping. - Add PipelineNotRunningError(ConflictError); reprocess/reprocess_all raise it on 409. Discard has no Running-state constraint, so its 409s stay ConflictError. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add an Enterprise Edition section covering the drop-in `glassflow.ee` client and DLQ management (list / reprocess / discard), including the FeatureNotLicensedError and PipelineNotRunningError behaviors and the 1000-id limit. Add a Features bullet for discoverability. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Align the EE DLQ list endpoint with glassflow-etl-ee#35 (ETL-1200) and make
iterating messages ergonomic:
- list() gains a `component` filter (ingestor/join/sink/dedup/oltp-receiver)
and now returns the paginated envelope {messages, next_cursor, has_more}
instead of a bare list; batch_size max raised to the backend's 1000.
- Add list_iter(), a lazy generator that pages via the cursor and yields
individual messages, so callers do not manage the cursor by hand.
- consume() (deprecated) unwraps the new envelope to keep its legacy list shape.
Update tests and the README accordingly.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add DLQ_COMPONENTS and reject an unknown `component` in list() with a ValueError before the request, instead of relying only on the server's 422. Verified against the live staging EE API (glassflow-etl-ee#35): valid components accepted, unknown rejected, and reprocess on a stopped pipeline returns 409 -> PipelineNotRunningError as mapped. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This was referenced Jun 16, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Implements ETL-1187 — the SDK side of DLQ message processing — on the Enterprise DLQ (
glassflow.ee.DLQ). Aligned with the merged backend glassflow-etl-ee#35 (ETL-1200).Stacked on #68 (ETL-1186) — base is
pablo/etl-1186-add-ee-client-to-sdk; review that first. The diff here is only the DLQ additions.Methods (on
ee.DLQ)list(batch_size, cursor, component)GET /dlq/list{messages, next_cursor, has_more}.componentfilters by data-plane stage.list_iter(batch_size, component, cursor)list(); pages via the cursor and yields individual messages.reprocess(message_ids)/reprocess_all()POST /dlq/reprocessmode=selected|all. Moves messages back to the pipeline input.discard(message_ids)/discard_all()POST /dlq/discardmode=selected|all. Permanent removal.Each
listmessage carriesmessage_id(NATS seq, as string),component,error,original_message,received_at.Validation & errors
batch_size1–1000,message_idsnon-empty and ≤1000, andcomponentin{ingestor, join, sink, dedup, oltp-receiver}(DLQ_COMPONENTS).403from an unlicensed backend →FeatureNotLicensedError(subclassesForbiddenError).409(reprocess on a non-Running pipeline) →PipelineNotRunningError(subclasses newConflictError).Deprecations (endpoints changing under EE)
consume()→ warns and delegates tolist()(the/dlq/consumeendpoint is being removed in favour of non-destructive/dlq/list); it unwraps the envelope to keep its legacy list return.purge()→ warns, points todiscard_all(); the legacy endpoint still works.Live verification (staging EE API, post-#35 merge)
Validated end-to-end against
do-ams3-staging-cluster:list_pipelines;list()envelope shape;list_iterpaging.componentfilter accepted server-side for valid values; invalid rejected client-side (422 as backstop).discard_all()→{request_id, discarded_count}in any pipeline state.PipelineNotRunningError(confirmed).PipelineNotFoundError, 422 →UnprocessableContentError, 409 →PipelineNotRunningError.Not exercised with populated data (needs failing rows + a flowing pipeline):
listfield payloads, real-next_cursorpagination,reprocess/discardby realmessage_id, and the reprocess202happy path. Note: reprocess on a Running pipeline with no available source reprocessor returns503 no_reprocessors_acknowledged, currently surfaced as a genericAPIError.Tests
tests/test_ee_dlq.py: list envelope + cursor + component (valid/invalid) + 1000 cap,list_iter(multi-page cursor advance, laziness, defensive stop), reprocess/discard request shapes + id validation,consume/purgedeprecation, and 403/409 mapping. Full suite green, ruff clean.🤖 Generated with Claude Code