Skip to content

feat(ee): add DLQ list / reprocess / discard to the Enterprise client#69

Merged
PabloPardoGarcia merged 5 commits into
pablo/etl-1186-add-ee-client-to-sdkfrom
pablo/etl-1187-sdk-add-dlq-reprocessing
Jun 15, 2026
Merged

feat(ee): add DLQ list / reprocess / discard to the Enterprise client#69
PabloPardoGarcia merged 5 commits into
pablo/etl-1186-add-ee-client-to-sdkfrom
pablo/etl-1187-sdk-add-dlq-reprocessing

Conversation

@PabloPardoGarcia

@PabloPardoGarcia PabloPardoGarcia commented Jun 11, 2026

Copy link
Copy Markdown
Member

What

Implements ETL-1187 — the SDK side of DLQ message processing — on the Enterprise DLQ (glassflow.ee.DLQ). Aligned with the merged backend glassflow-etl-ee#35 (ETL-1200).

Stacked on #68 (ETL-1186) — base is pablo/etl-1186-add-ee-client-to-sdk; review that first. The diff here is only the DLQ additions.

from glassflow.ee import Client

dlq = client.get_pipeline("my-pipeline").dlq
for msg in dlq.list_iter(component="sink"):     # lazy, auto-paged, filtered
    ...
dlq.reprocess(["123", "456"])                    # or reprocess_all()
dlq.discard(["789"])                             # or discard_all()

Methods (on ee.DLQ)

Method Endpoint Notes
list(batch_size, cursor, component) GET /dlq/list Non-destructive, paginated. Returns the envelope {messages, next_cursor, has_more}. component filters by data-plane stage.
list_iter(batch_size, component, cursor) Lazy generator over list(); pages via the cursor and yields individual messages.
reprocess(message_ids) / reprocess_all() POST /dlq/reprocess mode=selected|all. Moves messages back to the pipeline input.
discard(message_ids) / discard_all() POST /dlq/discard mode=selected|all. Permanent removal.

Each list message carries message_id (NATS seq, as string), component, error, original_message, received_at.

Validation & errors

  • Client-side validation (fast, offline): batch_size 1–1000, message_ids non-empty and ≤1000, and component in {ingestor, join, sink, dedup, oltp-receiver} (DLQ_COMPONENTS).
  • 403 from an unlicensed backend → FeatureNotLicensedError (subclasses ForbiddenError).
  • 409 (reprocess on a non-Running pipeline) → PipelineNotRunningError (subclasses new ConflictError).

Deprecations (endpoints changing under EE)

  • consume() → warns and delegates to list() (the /dlq/consume endpoint is being removed in favour of non-destructive /dlq/list); it unwraps the envelope to keep its legacy list return.
  • purge() → warns, points to discard_all(); the legacy endpoint still works.

Live verification (staging EE API, post-#35 merge)

Validated end-to-end against do-ams3-staging-cluster:

  • Connectivity + list_pipelines; list() envelope shape; list_iter paging.
  • component filter accepted server-side for valid values; invalid rejected client-side (422 as backstop).
  • discard_all(){request_id, discarded_count} in any pipeline state.
  • reprocess on a Stopped pipeline → 409 → PipelineNotRunningError (confirmed).
  • Error mapping: 404 → PipelineNotFoundError, 422 → UnprocessableContentError, 409 → PipelineNotRunningError.

Not exercised with populated data (needs failing rows + a flowing pipeline): list field payloads, real-next_cursor pagination, reprocess/discard by real message_id, and the reprocess 202 happy path. Note: reprocess on a Running pipeline with no available source reprocessor returns 503 no_reprocessors_acknowledged, currently surfaced as a generic APIError.

Tests

tests/test_ee_dlq.py: list envelope + cursor + component (valid/invalid) + 1000 cap, list_iter (multi-page cursor advance, laziness, defensive stop), reprocess/discard request shapes + id validation, consume/purge deprecation, and 403/409 mapping. Full suite green, ruff clean.

🤖 Generated with Claude Code

PabloPardoGarcia and others added 5 commits June 11, 2026 18:27
Implements the SDK side of DLQ message processing (ETL-1187) on the Enterprise
DLQ:
- `ee.DLQ.list(batch_size, cursor)` — non-destructive, paginated read. Messages
  now carry `message_id` (NATS seq), `source`, and `received_at`.
- `reprocess(message_ids)` / `reprocess_all()` — move messages back into the
  pipeline input (POST /dlq/reprocess, mode=selected|all).
- `discard(message_ids)` / `discard_all()` — permanently remove (POST
  /dlq/discard, mode=selected|all).

Deprecate the inherited operations whose endpoints are changing under EE:
- `consume()` warns and delegates to `list()` (the /dlq/consume endpoint is
  being removed in favour of non-destructive /dlq/list), so callers keep
  working through the transition.
- `purge()` warns and points to `discard_all()`; the legacy endpoint still works.

message_ids are validated client-side (non-empty, <=1000) to mirror the backend
and fail fast. A 403 from an unlicensed backend maps to FeatureNotLicensedError
(re-added), which subclasses ForbiddenError so existing 403 handling still
catches it. `ee.Pipeline` wires `_dlq_class` to the EE DLQ so
`client.get_pipeline(id).dlq` exposes these methods.

Contract per the DLQ Message Processing design doc / glassflow-etl-ee API. The
backend reprocess/discard handlers and the consume->list rename are still in
flight; the SDK is written against the agreed contract and tests mock the HTTP
layer.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Reprocess replays messages through the running pipeline, so the API returns
409 Conflict when the pipeline is not in the Running state. Map it to a clear,
catchable error instead of a generic APIError:

- Add ConflictError (409) to the base client error mapping.
- Add PipelineNotRunningError(ConflictError); reprocess/reprocess_all raise it
  on 409. Discard has no Running-state constraint, so its 409s stay ConflictError.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add an Enterprise Edition section covering the drop-in `glassflow.ee` client
and DLQ management (list / reprocess / discard), including the
FeatureNotLicensedError and PipelineNotRunningError behaviors and the 1000-id
limit. Add a Features bullet for discoverability.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Align the EE DLQ list endpoint with glassflow-etl-ee#35 (ETL-1200) and make
iterating messages ergonomic:

- list() gains a `component` filter (ingestor/join/sink/dedup/oltp-receiver)
  and now returns the paginated envelope {messages, next_cursor, has_more}
  instead of a bare list; batch_size max raised to the backend's 1000.
- Add list_iter(), a lazy generator that pages via the cursor and yields
  individual messages, so callers do not manage the cursor by hand.
- consume() (deprecated) unwraps the new envelope to keep its legacy list shape.

Update tests and the README accordingly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add DLQ_COMPONENTS and reject an unknown `component` in list() with a ValueError
before the request, instead of relying only on the server's 422. Verified
against the live staging EE API (glassflow-etl-ee#35): valid components accepted,
unknown rejected, and reprocess on a stopped pipeline returns 409 ->
PipelineNotRunningError as mapped.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@PabloPardoGarcia PabloPardoGarcia merged commit 4453cfd into pablo/etl-1186-add-ee-client-to-sdk Jun 15, 2026
@PabloPardoGarcia PabloPardoGarcia deleted the pablo/etl-1187-sdk-add-dlq-reprocessing branch June 15, 2026 16:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant