From eb395b44a6eb1a73f8acae135b7b4c83d134e9bd Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 11 Jun 2026 18:27:24 +0200 Subject: [PATCH 1/5] feat(ee): add DLQ list / reprocess / discard to the Enterprise client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the SDK side of DLQ message processing (ETL-1187) on the Enterprise DLQ: - `ee.DLQ.list(batch_size, cursor)` — non-destructive, paginated read. Messages now carry `message_id` (NATS seq), `source`, and `received_at`. - `reprocess(message_ids)` / `reprocess_all()` — move messages back into the pipeline input (POST /dlq/reprocess, mode=selected|all). - `discard(message_ids)` / `discard_all()` — permanently remove (POST /dlq/discard, mode=selected|all). Deprecate the inherited operations whose endpoints are changing under EE: - `consume()` warns and delegates to `list()` (the /dlq/consume endpoint is being removed in favour of non-destructive /dlq/list), so callers keep working through the transition. - `purge()` warns and points to `discard_all()`; the legacy endpoint still works. message_ids are validated client-side (non-empty, <=1000) to mirror the backend and fail fast. A 403 from an unlicensed backend maps to FeatureNotLicensedError (re-added), which subclasses ForbiddenError so existing 403 handling still catches it. `ee.Pipeline` wires `_dlq_class` to the EE DLQ so `client.get_pipeline(id).dlq` exposes these methods. Contract per the DLQ Message Processing design doc / glassflow-etl-ee API. The backend reprocess/discard handlers and the consume->list rename are still in flight; the SDK is written against the agreed contract and tests mock the HTTP layer. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/glassflow/ee/__init__.py | 3 + src/glassflow/ee/dlq.py | 170 ++++++++++++++++++++++++++++++++ src/glassflow/ee/pipeline.py | 20 +++- src/glassflow/etl/errors.py | 6 ++ tests/test_ee_dlq.py | 182 +++++++++++++++++++++++++++++++++++ 5 files changed, 378 insertions(+), 3 deletions(-) create mode 100644 src/glassflow/ee/dlq.py create mode 100644 tests/test_ee_dlq.py diff --git a/src/glassflow/ee/__init__.py b/src/glassflow/ee/__init__.py index e54c555..0aff448 100644 --- a/src/glassflow/ee/__init__.py +++ b/src/glassflow/ee/__init__.py @@ -8,6 +8,7 @@ client = Client(host="https://...") pipeline = client.get_pipeline("my-pipeline") + pipeline.dlq.reprocess_all() # Enterprise DLQ management All open-source models are re-exported from :mod:`glassflow.etl` for convenience, so a single import path covers both tiers. @@ -21,11 +22,13 @@ ) from .client import Client +from .dlq import DLQ from .pipeline import Pipeline __all__ = [ "Pipeline", "Client", + "DLQ", "PipelineConfig", "SourceConfig", "SinkConfig", diff --git a/src/glassflow/ee/dlq.py b/src/glassflow/ee/dlq.py new file mode 100644 index 0000000..553f5da --- /dev/null +++ b/src/glassflow/ee/dlq.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +import warnings +from typing import Any, Dict, List, Optional + +from glassflow.etl import errors +from glassflow.etl.dlq import DLQ as _OSSDLQ + +# Mirrors the backend cap on message_ids per mode=selected request +# (dlqSelectedMaxMessageIDs in glassflow-etl-ee). Validated client-side for a +# fast, offline error instead of a round-trip 400. +MAX_SELECTED_MESSAGE_IDS = 1000 + + +class DLQ(_OSSDLQ): + """Enterprise Dead Letter Queue client. + + Extends the open-source :class:`glassflow.etl.dlq.DLQ` with message + management: a non-destructive paginated :meth:`list`, and + :meth:`reprocess`/:meth:`discard` (plus their ``*_all`` variants) that move + messages back into the pipeline or permanently remove them. + + Backend entitlement is enforced server-side; calling these against a backend + that is not licensed for them raises :class:`FeatureNotLicensedError`. + """ + + def list( + self, batch_size: int = 100, cursor: Optional[str] = None + ) -> List[Dict[str, Any]]: + """Read messages from the DLQ without removing them. + + The non-destructive successor to :meth:`consume`. Each message carries a + stable ``message_id`` (NATS sequence number, as a string) plus + ``source``, ``component``, ``error``, ``original_message`` and + ``received_at``. The ``message_id`` values are what :meth:`reprocess` + and :meth:`discard` act on in ``mode=selected``. + + Args: + batch_size: Number of messages to read (between 1 and 100). + cursor: Opaque pagination cursor from a previous page; omit for the + first page. + + Returns: + List of DLQ message dicts. + + Raises: + ValueError: If ``batch_size`` is invalid. + APIError: If the API request fails. + """ + if ( + not isinstance(batch_size, int) + or batch_size < 1 + or batch_size > self._max_batch_size + ): + raise ValueError( + f"batch_size must be an integer between 1 and {self._max_batch_size}" + ) + + params: Dict[str, Any] = {"batch_size": batch_size} + if cursor is not None: + params["cursor"] = cursor + + response = self._request("GET", f"{self.endpoint}/list", params=params) + if response.status_code == 204 or not response.content: + return [] + return response.json() + + def reprocess(self, message_ids: List[str]) -> Dict[str, Any]: + """Move specific messages from the DLQ back into the pipeline input. + + Args: + message_ids: ``message_id`` values (from :meth:`list`) to reprocess; + must be non-empty and at most ``MAX_SELECTED_MESSAGE_IDS``. + + Returns: + Dict with ``request_id`` and ``status`` ("accepted"). The republish + happens asynchronously. + + Raises: + ValueError: If ``message_ids`` is empty or too large. + FeatureNotLicensedError: If the backend is not licensed for this. + APIError: If the API request fails (e.g. 409 if the pipeline is not + Running). + """ + return self._action("reprocess", "selected", message_ids) + + def reprocess_all(self) -> Dict[str, Any]: + """Reprocess every message currently in the DLQ (up to the head at + request time). See :meth:`reprocess`.""" + return self._action("reprocess", "all", None) + + def discard(self, message_ids: List[str]) -> Dict[str, Any]: + """Permanently remove specific messages from the DLQ without + reprocessing them. + + Args: + message_ids: ``message_id`` values (from :meth:`list`) to discard; + must be non-empty and at most ``MAX_SELECTED_MESSAGE_IDS``. + + Returns: + Dict with ``request_id`` and ``discarded_count``. + + Raises: + ValueError: If ``message_ids`` is empty or too large. + FeatureNotLicensedError: If the backend is not licensed for this. + APIError: If the API request fails. + """ + return self._action("discard", "selected", message_ids) + + def discard_all(self) -> Dict[str, Any]: + """Permanently remove every message currently in the DLQ. See + :meth:`discard`.""" + return self._action("discard", "all", None) + + # Deprecated, inherited operations ------------------------------------- + + def consume(self, batch_size: int = 100) -> List[Dict[str, Any]]: + """Deprecated: use :meth:`list`. + + The ``/dlq/consume`` endpoint is being removed in favour of the + non-destructive ``/dlq/list``. This override delegates to :meth:`list` + so existing callers keep working through the transition. + """ + warnings.warn( + "DLQ.consume() is deprecated and the /dlq/consume endpoint is being " + "removed; use DLQ.list() (non-destructive).", + DeprecationWarning, + stacklevel=2, + ) + return self.list(batch_size=batch_size) + + def purge(self) -> None: + """Deprecated: use :meth:`discard_all`. + + ``/dlq/purge`` remains for backward compatibility but is superseded by + :meth:`discard_all`. + """ + warnings.warn( + "DLQ.purge() is deprecated; use DLQ.discard_all().", + DeprecationWarning, + stacklevel=2, + ) + super().purge() + + def _action( + self, action: str, mode: str, message_ids: Optional[List[str]] + ) -> Dict[str, Any]: + body: Dict[str, Any] = {"mode": mode} + if mode == "selected": + if not message_ids: + raise ValueError( + "message_ids must be non-empty when selecting messages" + ) + if len(message_ids) > MAX_SELECTED_MESSAGE_IDS: + raise ValueError( + f"message_ids cannot exceed {MAX_SELECTED_MESSAGE_IDS} entries" + ) + body["message_ids"] = message_ids + + try: + response = self._request("POST", f"{self.endpoint}/{action}", json=body) + if response.status_code == 204 or not response.content: + return {} + return response.json() + except errors.ForbiddenError as e: + raise errors.FeatureNotLicensedError( + status_code=e.status_code, + message=(f"DLQ {action} requires a GlassFlow Enterprise license"), + response=e.response, + ) from e diff --git a/src/glassflow/ee/pipeline.py b/src/glassflow/ee/pipeline.py index f9feec6..7dd57fd 100644 --- a/src/glassflow/ee/pipeline.py +++ b/src/glassflow/ee/pipeline.py @@ -2,11 +2,25 @@ from glassflow.etl.pipeline import Pipeline as _OSSPipeline +from .dlq import DLQ + class Pipeline(_OSSPipeline): """Enterprise Pipeline. - Extends the open-source :class:`glassflow.etl.pipeline.Pipeline`. Currently - a pass-through; Enterprise-only pipeline capabilities are added here in - follow-up work. + Extends the open-source :class:`glassflow.etl.pipeline.Pipeline`. Its ``dlq`` + property exposes the Enterprise :class:`~.dlq.DLQ` (with + ``list``/``reprocess``/``discard``). Construction is inherited unchanged; + only the DLQ collaborator class is swapped via ``_dlq_class``. """ + + _dlq_class = DLQ + + @property + def dlq(self) -> DLQ: + """Get the Enterprise DLQ client for this pipeline.""" + return self._dlq + + @dlq.setter + def dlq(self, dlq: DLQ) -> None: + self._dlq = dlq diff --git a/src/glassflow/etl/errors.py b/src/glassflow/etl/errors.py index b3651f7..5292103 100644 --- a/src/glassflow/etl/errors.py +++ b/src/glassflow/etl/errors.py @@ -34,6 +34,12 @@ class ForbiddenError(APIError): """Raised on 403 Forbidden errors.""" +class FeatureNotLicensedError(ForbiddenError): + """Raised when an Enterprise-only capability is invoked against a backend + that is not licensed for it (the API responds 403). Subclasses + ForbiddenError so existing 403 handling still catches it.""" + + class UnprocessableContentError(APIError): """Raised on 422 Unprocessable Content errors.""" diff --git a/tests/test_ee_dlq.py b/tests/test_ee_dlq.py new file mode 100644 index 0000000..591894d --- /dev/null +++ b/tests/test_ee_dlq.py @@ -0,0 +1,182 @@ +"""Tests for the Enterprise DLQ: list / reprocess / discard.""" + +from unittest.mock import patch + +import pytest + +from glassflow import ee +from glassflow.etl import errors +from glassflow.etl.dlq import DLQ as OSSDLQ +from tests.data import mock_responses + + +@pytest.fixture +def ee_dlq(): + return ee.DLQ(host="http://localhost:8080", pipeline_id="test-pipeline") + + +@pytest.fixture +def ee_pipeline(valid_config): + config = ee.PipelineConfig(**valid_config) + return ee.Pipeline(host="http://localhost:8080", config=config) + + +class TestEEDLQWiring: + def test_ee_dlq_subclasses_oss(self): + assert issubclass(ee.DLQ, OSSDLQ) + + def test_pipeline_exposes_ee_dlq(self, ee_pipeline): + assert isinstance(ee_pipeline.dlq, ee.DLQ) + + def test_dlq_setter_still_works(self, ee_pipeline, ee_dlq): + ee_pipeline.dlq = ee_dlq + assert ee_pipeline.dlq is ee_dlq + + def test_get_pipeline_dlq_is_ee( + self, mock_success, get_pipeline_response, get_health_payload + ): + client = ee.Client(host="http://localhost:8080") + with mock_success( + [get_pipeline_response, get_health_payload("test-pipeline-id")] + ): + pipeline = client.get_pipeline("test-pipeline-id") + assert isinstance(pipeline.dlq, ee.DLQ) + + +class TestList: + def test_list_success(self, ee_dlq, mock_success): + payload = [ + { + "message_id": "seq_101", + "source": "source-0", + "component": "sink", + "error": "connection refused", + "original_message": "{}", + "received_at": "2026-05-29T14:00:00Z", + } + ] + with mock_success(json_payloads=[payload]) as mock_get: + result = ee_dlq.list(batch_size=50) + + mock_get.assert_called_once_with( + "GET", f"{ee_dlq.endpoint}/list", params={"batch_size": 50} + ) + assert result == payload + + def test_list_with_cursor(self, ee_dlq, mock_success): + with mock_success(json_payloads=[[]]) as mock_get: + ee_dlq.list(batch_size=10, cursor="seq_200") + mock_get.assert_called_once_with( + "GET", + f"{ee_dlq.endpoint}/list", + params={"batch_size": 10, "cursor": "seq_200"}, + ) + + def test_list_empty_on_204(self, ee_dlq): + mock_response = mock_responses.create_mock_response_factory()( + status_code=204, json_data=None + ) + with patch("httpx.Client.request", return_value=mock_response): + assert ee_dlq.list() == [] + + @pytest.mark.parametrize("bad", [0, 101, -1, "10"]) + def test_list_invalid_batch_size(self, ee_dlq, bad): + with pytest.raises(ValueError, match="batch_size must be an integer"): + ee_dlq.list(batch_size=bad) + + +class TestReprocess: + def test_reprocess_selected(self, ee_dlq, mock_success): + with mock_success( + json_payloads=[{"request_id": "rep_1", "status": "accepted"}] + ) as mock_post: + result = ee_dlq.reprocess(["seq_101", "seq_102"]) + + mock_post.assert_called_once_with( + "POST", + f"{ee_dlq.endpoint}/reprocess", + json={"mode": "selected", "message_ids": ["seq_101", "seq_102"]}, + ) + assert result == {"request_id": "rep_1", "status": "accepted"} + + def test_reprocess_all(self, ee_dlq, mock_success): + with mock_success( + json_payloads=[{"request_id": "rep_2", "status": "accepted"}] + ) as mock_post: + ee_dlq.reprocess_all() + mock_post.assert_called_once_with( + "POST", f"{ee_dlq.endpoint}/reprocess", json={"mode": "all"} + ) + + def test_reprocess_empty_ids_raises(self, ee_dlq): + with pytest.raises(ValueError, match="must be non-empty"): + ee_dlq.reprocess([]) + + def test_reprocess_too_many_ids_raises(self, ee_dlq): + with pytest.raises(ValueError, match="cannot exceed 1000"): + ee_dlq.reprocess([str(i) for i in range(1001)]) + + +class TestDiscard: + def test_discard_selected(self, ee_dlq, mock_success): + with mock_success( + json_payloads=[{"request_id": "dis_1", "discarded_count": 2}] + ) as mock_post: + result = ee_dlq.discard(["seq_1", "seq_2"]) + + mock_post.assert_called_once_with( + "POST", + f"{ee_dlq.endpoint}/discard", + json={"mode": "selected", "message_ids": ["seq_1", "seq_2"]}, + ) + assert result == {"request_id": "dis_1", "discarded_count": 2} + + def test_discard_all(self, ee_dlq, mock_success): + with mock_success( + json_payloads=[{"request_id": "dis_2", "discarded_count": 9}] + ) as mock_post: + ee_dlq.discard_all() + mock_post.assert_called_once_with( + "POST", f"{ee_dlq.endpoint}/discard", json={"mode": "all"} + ) + + def test_discard_empty_ids_raises(self, ee_dlq): + with pytest.raises(ValueError, match="must be non-empty"): + ee_dlq.discard([]) + + +class TestDeprecatedInherited: + def test_consume_warns_and_delegates_to_list(self, ee_dlq, mock_success): + with mock_success(json_payloads=[[{"message_id": "seq_1"}]]) as mock_get: + with pytest.warns(DeprecationWarning, match="use DLQ.list"): + result = ee_dlq.consume(batch_size=25) + + # Hits /dlq/list, not /dlq/consume. + mock_get.assert_called_once_with( + "GET", f"{ee_dlq.endpoint}/list", params={"batch_size": 25} + ) + assert result == [{"message_id": "seq_1"}] + + def test_purge_warns_and_hits_purge_endpoint(self, ee_dlq, mock_success): + with mock_success() as mock_post: + with pytest.warns(DeprecationWarning, match="use DLQ.discard_all"): + ee_dlq.purge() + + mock_post.assert_called_once_with("POST", f"{ee_dlq.endpoint}/purge") + + +class TestEntitlement: + def test_forbidden_maps_to_feature_not_licensed(self, ee_dlq): + mock_response = mock_responses.create_mock_response_factory()( + status_code=403, json_data={"message": "Forbidden"} + ) + with patch( + "httpx.Client.request", + side_effect=mock_response.raise_for_status.side_effect, + ): + with pytest.raises(errors.FeatureNotLicensedError) as exc_info: + ee_dlq.reprocess_all() + + assert "Enterprise" in str(exc_info.value) + # Still catchable as a ForbiddenError by existing 403 handling. + assert isinstance(exc_info.value, errors.ForbiddenError) From 9f18cae6d879a8e38da8f1907abfa22969b1a92f Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Fri, 12 Jun 2026 15:30:14 +0200 Subject: [PATCH 2/5] feat(ee): surface 409 on reprocess as PipelineNotRunningError Reprocess replays messages through the running pipeline, so the API returns 409 Conflict when the pipeline is not in the Running state. Map it to a clear, catchable error instead of a generic APIError: - Add ConflictError (409) to the base client error mapping. - Add PipelineNotRunningError(ConflictError); reprocess/reprocess_all raise it on 409. Discard has no Running-state constraint, so its 409s stay ConflictError. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/glassflow/ee/dlq.py | 18 ++++++++++++++++-- src/glassflow/etl/api_client.py | 2 ++ src/glassflow/etl/errors.py | 11 +++++++++++ tests/test_ee_dlq.py | 33 +++++++++++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 2 deletions(-) diff --git a/src/glassflow/ee/dlq.py b/src/glassflow/ee/dlq.py index 553f5da..bbf53c0 100644 --- a/src/glassflow/ee/dlq.py +++ b/src/glassflow/ee/dlq.py @@ -79,8 +79,8 @@ def reprocess(self, message_ids: List[str]) -> Dict[str, Any]: Raises: ValueError: If ``message_ids`` is empty or too large. FeatureNotLicensedError: If the backend is not licensed for this. - APIError: If the API request fails (e.g. 409 if the pipeline is not - Running). + PipelineNotRunningError: If the pipeline is not in the Running state. + APIError: If the API request fails. """ return self._action("reprocess", "selected", message_ids) @@ -168,3 +168,17 @@ def _action( message=(f"DLQ {action} requires a GlassFlow Enterprise license"), response=e.response, ) from e + except errors.ConflictError as e: + # Reprocess replays through the running pipeline, so the API rejects + # it with 409 when the pipeline is not in the Running state. Discard + # acts on the queue directly and has no such constraint. + if action == "reprocess": + raise errors.PipelineNotRunningError( + status_code=e.status_code, + message=( + "Pipeline must be in the Running state to reprocess DLQ " + "messages" + ), + response=e.response, + ) from e + raise diff --git a/src/glassflow/etl/api_client.py b/src/glassflow/etl/api_client.py index 221f6ce..701a874 100644 --- a/src/glassflow/etl/api_client.py +++ b/src/glassflow/etl/api_client.py @@ -113,6 +113,8 @@ def _raise_api_error(response: httpx.Response) -> None: raise errors.ForbiddenError(status_code, message, response=response) elif status_code == 404: raise errors.NotFoundError(status_code, message, response=response) + elif status_code == 409: + raise errors.ConflictError(status_code, message, response=response) elif status_code == 422: raise errors.UnprocessableContentError( status_code, message, response=response diff --git a/src/glassflow/etl/errors.py b/src/glassflow/etl/errors.py index 5292103..b903440 100644 --- a/src/glassflow/etl/errors.py +++ b/src/glassflow/etl/errors.py @@ -40,6 +40,17 @@ class FeatureNotLicensedError(ForbiddenError): ForbiddenError so existing 403 handling still catches it.""" +class ConflictError(APIError): + """Raised on 409 Conflict errors.""" + + +class PipelineNotRunningError(ConflictError): + """Raised when an operation requires a Running pipeline but the pipeline is + in another state (the API responds 409). For example, DLQ reprocessing + replays messages through the running pipeline and is rejected when the + pipeline is stopped, terminated, or failed.""" + + class UnprocessableContentError(APIError): """Raised on 422 Unprocessable Content errors.""" diff --git a/tests/test_ee_dlq.py b/tests/test_ee_dlq.py index 591894d..250430a 100644 --- a/tests/test_ee_dlq.py +++ b/tests/test_ee_dlq.py @@ -180,3 +180,36 @@ def test_forbidden_maps_to_feature_not_licensed(self, ee_dlq): assert "Enterprise" in str(exc_info.value) # Still catchable as a ForbiddenError by existing 403 handling. assert isinstance(exc_info.value, errors.ForbiddenError) + + +class TestPipelineState: + def _conflict_patch(self): + mock_response = mock_responses.create_mock_response_factory()( + status_code=409, json_data={"message": "pipeline is not running"} + ) + return patch( + "httpx.Client.request", + side_effect=mock_response.raise_for_status.side_effect, + ) + + def test_reprocess_on_non_running_raises_pipeline_not_running(self, ee_dlq): + with self._conflict_patch(): + with pytest.raises(errors.PipelineNotRunningError) as exc_info: + ee_dlq.reprocess(["seq_1"]) + + assert "Running" in str(exc_info.value) + # Still catchable as the generic 409 ConflictError. + assert isinstance(exc_info.value, errors.ConflictError) + + def test_reprocess_all_on_non_running_raises_pipeline_not_running(self, ee_dlq): + with self._conflict_patch(): + with pytest.raises(errors.PipelineNotRunningError): + ee_dlq.reprocess_all() + + def test_discard_409_stays_conflict_error(self, ee_dlq): + # Discard has no Running-state constraint, so a 409 is not remapped. + with self._conflict_patch(): + with pytest.raises(errors.ConflictError) as exc_info: + ee_dlq.discard_all() + + assert not isinstance(exc_info.value, errors.PipelineNotRunningError) From b3c5f95b1122b038e35a46bb35d15fac4e8bf0e7 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Fri, 12 Jun 2026 15:39:53 +0200 Subject: [PATCH 3/5] docs(readme): document the Enterprise client and DLQ message processing Add an Enterprise Edition section covering the drop-in `glassflow.ee` client and DLQ management (list / reprocess / discard), including the FeatureNotLicensedError and PipelineNotRunningError behaviors and the 1000-id limit. Add a Features bullet for discoverability. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/README.md b/README.md index 2b3bea5..4feac2d 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ A Python SDK for creating and managing data pipelines between Kafka and ClickHou - Pipeline configuration via YAML or JSON - Schema validation and configuration management - Fine-grained resource control per pipeline component +- Enterprise Edition client (`glassflow.ee`) with DLQ reprocessing and discard ## Installation @@ -157,6 +158,44 @@ client.delete_pipeline("my-pipeline-id") pipeline.delete() ``` +## Enterprise Edition + +The GlassFlow Enterprise Edition adds capabilities on top of the Open Source engine. The SDK exposes them through a drop-in client that extends the Open Source one. Import `Client` from `glassflow.ee` instead of `glassflow.etl`: + +```python +from glassflow.ee import Client + +client = Client(host="your-glassflow-etl-url") +``` + +The Enterprise client does everything the Open Source client does, plus the Enterprise-only features below. Entitlement is enforced by the backend: calling an Enterprise-only operation against a backend that is not licensed for it raises `FeatureNotLicensedError`. + +### DLQ message processing + +When a pipeline component fails to process a message, that message lands in the pipeline's dead-letter queue (DLQ). On the Enterprise client, `pipeline.dlq` adds message management on top of the Open Source `state`, `consume`, and `purge`: + +- `list(batch_size, cursor)`: non-destructive paginated read. Each message includes a stable `message_id`, plus its `source` and `received_at`. +- `reprocess(message_ids)` / `reprocess_all()`: move messages back into the pipeline input to be processed again. +- `discard(message_ids)` / `discard_all()`: permanently remove messages. + +```python +pipeline = client.get_pipeline("my-pipeline-id") + +# Inspect failed messages +messages = pipeline.dlq.list(batch_size=50) +ids = [m["message_id"] for m in messages] + +# Retry them after fixing the underlying issue +pipeline.dlq.reprocess(ids) # or pipeline.dlq.reprocess_all() + +# Or drop the ones you do not want +pipeline.dlq.discard(["seq_200"]) # or pipeline.dlq.discard_all() +``` + +Reprocessing replays messages through the running pipeline, so the pipeline must be in the `Running` state. Calling `reprocess` on a stopped, terminated, or failed pipeline raises `PipelineNotRunningError`. Discard acts on the queue directly and works in any state. + +`reprocess` and `discard` accept at most 1000 `message_id` values per call. For larger sets, use the `*_all` variants. See the [DLQ documentation](https://docs.glassflow.dev/configuration/dlq) for the full reference. + ## Migrating from V2 to V3 Pipeline version `v2` has been removed. Use `Client.migrate_pipeline_v2_to_v3()` to convert an existing configuration automatically: From f04afa8eca2986916e9b5f7fa61ac3bd943cbfcb Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Mon, 15 Jun 2026 17:11:08 +0200 Subject: [PATCH 4/5] feat(ee): add DLQ component filter, list pagination, and list_iter Align the EE DLQ list endpoint with glassflow-etl-ee#35 (ETL-1200) and make iterating messages ergonomic: - list() gains a `component` filter (ingestor/join/sink/dedup/oltp-receiver) and now returns the paginated envelope {messages, next_cursor, has_more} instead of a bare list; batch_size max raised to the backend's 1000. - Add list_iter(), a lazy generator that pages via the cursor and yields individual messages, so callers do not manage the cursor by hand. - consume() (deprecated) unwraps the new envelope to keep its legacy list shape. Update tests and the README accordingly. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 8 ++-- src/glassflow/ee/dlq.py | 73 +++++++++++++++++++++++++++-------- tests/test_ee_dlq.py | 85 +++++++++++++++++++++++++++++++++-------- 3 files changed, 132 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 4feac2d..18a7f94 100644 --- a/README.md +++ b/README.md @@ -174,16 +174,16 @@ The Enterprise client does everything the Open Source client does, plus the Ente When a pipeline component fails to process a message, that message lands in the pipeline's dead-letter queue (DLQ). On the Enterprise client, `pipeline.dlq` adds message management on top of the Open Source `state`, `consume`, and `purge`: -- `list(batch_size, cursor)`: non-destructive paginated read. Each message includes a stable `message_id`, plus its `source` and `received_at`. +- `list(batch_size, cursor, component)`: non-destructive paginated read. Returns a page dict with `messages` (each carrying a stable `message_id`, `component`, `error`, `original_message`, and `received_at`), `has_more`, and `next_cursor`. Pass `component` to filter to a single component (`ingestor`, `join`, `sink`, `dedup`, `oltp-receiver`), and pass `next_cursor` back as `cursor` to page. +- `list_iter(batch_size, component)`: lazily iterate over every message, paging via the cursor for you. Yields individual messages, so you do not manage the cursor by hand. - `reprocess(message_ids)` / `reprocess_all()`: move messages back into the pipeline input to be processed again. - `discard(message_ids)` / `discard_all()`: permanently remove messages. ```python pipeline = client.get_pipeline("my-pipeline-id") -# Inspect failed messages -messages = pipeline.dlq.list(batch_size=50) -ids = [m["message_id"] for m in messages] +# Inspect failed messages from the sink only (paged automatically) +ids = [m["message_id"] for m in pipeline.dlq.list_iter(component="sink")] # Retry them after fixing the underlying issue pipeline.dlq.reprocess(ids) # or pipeline.dlq.reprocess_all() diff --git a/src/glassflow/ee/dlq.py b/src/glassflow/ee/dlq.py index bbf53c0..9110a06 100644 --- a/src/glassflow/ee/dlq.py +++ b/src/glassflow/ee/dlq.py @@ -1,7 +1,7 @@ from __future__ import annotations import warnings -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Iterator, List, Optional from glassflow.etl import errors from glassflow.etl.dlq import DLQ as _OSSDLQ @@ -25,46 +25,89 @@ class DLQ(_OSSDLQ): """ def list( - self, batch_size: int = 100, cursor: Optional[str] = None - ) -> List[Dict[str, Any]]: + self, + batch_size: int = 100, + cursor: Optional[str] = None, + component: Optional[str] = None, + ) -> Dict[str, Any]: """Read messages from the DLQ without removing them. - The non-destructive successor to :meth:`consume`. Each message carries a - stable ``message_id`` (NATS sequence number, as a string) plus - ``source``, ``component``, ``error``, ``original_message`` and + The non-destructive successor to :meth:`consume`. Each message in the + returned page carries a stable ``message_id`` (NATS sequence number, as + a string) plus ``component``, ``error``, ``original_message`` and ``received_at``. The ``message_id`` values are what :meth:`reprocess` and :meth:`discard` act on in ``mode=selected``. Args: - batch_size: Number of messages to read (between 1 and 100). - cursor: Opaque pagination cursor from a previous page; omit for the - first page. + batch_size: Number of messages per page (between 1 and 1000). + cursor: NATS sequence to resume from, taken from the previous page's + ``next_cursor``; omit for the first page. + component: Filter to messages from a single data-plane component, + one of ``ingestor``, ``join``, ``sink``, ``dedup``, + ``oltp-receiver``; omit for all components. Returns: - List of DLQ message dicts. + A dict with ``messages`` (list of message dicts), ``has_more`` + (bool), and ``next_cursor`` (str, present when ``has_more`` is + true; pass it back as ``cursor`` to fetch the next page). Raises: ValueError: If ``batch_size`` is invalid. - APIError: If the API request fails. + APIError: If the API request fails (e.g. an unknown ``component``). """ if ( not isinstance(batch_size, int) or batch_size < 1 - or batch_size > self._max_batch_size + or batch_size > MAX_SELECTED_MESSAGE_IDS ): raise ValueError( - f"batch_size must be an integer between 1 and {self._max_batch_size}" + f"batch_size must be an integer between 1 and " + f"{MAX_SELECTED_MESSAGE_IDS}" ) params: Dict[str, Any] = {"batch_size": batch_size} if cursor is not None: params["cursor"] = cursor + if component is not None: + params["component"] = component response = self._request("GET", f"{self.endpoint}/list", params=params) if response.status_code == 204 or not response.content: - return [] + return {"messages": [], "has_more": False} return response.json() + def list_iter( + self, + batch_size: int = 100, + component: Optional[str] = None, + cursor: Optional[str] = None, + ) -> Iterator[Dict[str, Any]]: + """Lazily iterate over every DLQ message, paging via the cursor for you. + + The streaming companion to :meth:`list`: it calls :meth:`list` page by + page and yields each message, so callers do not manage the cursor + themselves. Memory stays flat (one page at a time) and it composes with + ``itertools`` (e.g. ``itertools.islice`` for the first N). + + Args: + batch_size: Messages fetched per underlying page request (1-1000). + component: Optional component filter; see :meth:`list`. + cursor: Optional starting cursor (e.g. to resume a previous run); + omit to start at the beginning of the DLQ. + + Yields: + Individual DLQ message dicts. + """ + while True: + page = self.list(batch_size=batch_size, cursor=cursor, component=component) + yield from page.get("messages", []) + if not page.get("has_more"): + return + cursor = page.get("next_cursor") + # Defensive: a truthy has_more without a cursor would loop forever. + if not cursor: + return + def reprocess(self, message_ids: List[str]) -> Dict[str, Any]: """Move specific messages from the DLQ back into the pipeline input. @@ -127,7 +170,7 @@ def consume(self, batch_size: int = 100) -> List[Dict[str, Any]]: DeprecationWarning, stacklevel=2, ) - return self.list(batch_size=batch_size) + return self.list(batch_size=batch_size).get("messages", []) def purge(self) -> None: """Deprecated: use :meth:`discard_all`. diff --git a/tests/test_ee_dlq.py b/tests/test_ee_dlq.py index 250430a..57c3efe 100644 --- a/tests/test_ee_dlq.py +++ b/tests/test_ee_dlq.py @@ -45,16 +45,19 @@ def test_get_pipeline_dlq_is_ee( class TestList: def test_list_success(self, ee_dlq, mock_success): - payload = [ - { - "message_id": "seq_101", - "source": "source-0", - "component": "sink", - "error": "connection refused", - "original_message": "{}", - "received_at": "2026-05-29T14:00:00Z", - } - ] + payload = { + "messages": [ + { + "message_id": "seq_101", + "component": "sink", + "error": "connection refused", + "original_message": "{}", + "received_at": "2026-05-29T14:00:00Z", + } + ], + "next_cursor": "seq_101", + "has_more": True, + } with mock_success(json_payloads=[payload]) as mock_get: result = ee_dlq.list(batch_size=50) @@ -62,29 +65,79 @@ def test_list_success(self, ee_dlq, mock_success): "GET", f"{ee_dlq.endpoint}/list", params={"batch_size": 50} ) assert result == payload + assert result["messages"][0]["message_id"] == "seq_101" def test_list_with_cursor(self, ee_dlq, mock_success): - with mock_success(json_payloads=[[]]) as mock_get: + with mock_success(json_payloads=[{"messages": [], "has_more": False}]) as m: ee_dlq.list(batch_size=10, cursor="seq_200") - mock_get.assert_called_once_with( + m.assert_called_once_with( "GET", f"{ee_dlq.endpoint}/list", params={"batch_size": 10, "cursor": "seq_200"}, ) + def test_list_with_component_filter(self, ee_dlq, mock_success): + with mock_success(json_payloads=[{"messages": [], "has_more": False}]) as m: + ee_dlq.list(batch_size=10, component="sink") + m.assert_called_once_with( + "GET", + f"{ee_dlq.endpoint}/list", + params={"batch_size": 10, "component": "sink"}, + ) + def test_list_empty_on_204(self, ee_dlq): mock_response = mock_responses.create_mock_response_factory()( status_code=204, json_data=None ) with patch("httpx.Client.request", return_value=mock_response): - assert ee_dlq.list() == [] + assert ee_dlq.list() == {"messages": [], "has_more": False} - @pytest.mark.parametrize("bad", [0, 101, -1, "10"]) + @pytest.mark.parametrize("bad", [0, 1001, -1, "10"]) def test_list_invalid_batch_size(self, ee_dlq, bad): with pytest.raises(ValueError, match="batch_size must be an integer"): ee_dlq.list(batch_size=bad) +class TestListIter: + def test_pages_through_and_advances_cursor(self, ee_dlq, mock_success): + page1 = { + "messages": [{"message_id": "1"}, {"message_id": "2"}], + "next_cursor": "2", + "has_more": True, + } + page2 = {"messages": [{"message_id": "3"}], "has_more": False} + with mock_success(json_payloads=[page1, page2]) as mock_get: + ids = [m["message_id"] for m in ee_dlq.list_iter(batch_size=2)] + + assert ids == ["1", "2", "3"] + assert mock_get.call_count == 2 + # Second page resumes from the first page's next_cursor. + assert mock_get.call_args_list[1].kwargs["params"] == { + "batch_size": 2, + "cursor": "2", + } + + def test_forwards_component_and_is_lazy(self, ee_dlq, mock_success): + page = {"messages": [{"message_id": "1"}], "has_more": False} + with mock_success(json_payloads=[page]) as mock_get: + it = ee_dlq.list_iter(component="sink") + # Generator is lazy: no request until first iteration. + assert mock_get.call_count == 0 + next(it) + assert mock_get.call_args_list[0].kwargs["params"] == { + "batch_size": 100, + "component": "sink", + } + + def test_stops_when_has_more_without_cursor(self, ee_dlq, mock_success): + # Defensive guard: truthy has_more but no next_cursor must not loop. + page = {"messages": [{"message_id": "1"}], "has_more": True} + with mock_success(json_payloads=[page]) as mock_get: + ids = [m["message_id"] for m in ee_dlq.list_iter()] + assert ids == ["1"] + assert mock_get.call_count == 1 + + class TestReprocess: def test_reprocess_selected(self, ee_dlq, mock_success): with mock_success( @@ -147,7 +200,8 @@ def test_discard_empty_ids_raises(self, ee_dlq): class TestDeprecatedInherited: def test_consume_warns_and_delegates_to_list(self, ee_dlq, mock_success): - with mock_success(json_payloads=[[{"message_id": "seq_1"}]]) as mock_get: + envelope = {"messages": [{"message_id": "seq_1"}], "has_more": False} + with mock_success(json_payloads=[envelope]) as mock_get: with pytest.warns(DeprecationWarning, match="use DLQ.list"): result = ee_dlq.consume(batch_size=25) @@ -155,6 +209,7 @@ def test_consume_warns_and_delegates_to_list(self, ee_dlq, mock_success): mock_get.assert_called_once_with( "GET", f"{ee_dlq.endpoint}/list", params={"batch_size": 25} ) + # consume() unwraps the envelope to the legacy list shape. assert result == [{"message_id": "seq_1"}] def test_purge_warns_and_hits_purge_endpoint(self, ee_dlq, mock_success): From 2e7ed6d4cd082a2d8867c6f201061cf1cda9e4d3 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Mon, 15 Jun 2026 18:19:18 +0200 Subject: [PATCH 5/5] feat(ee): validate DLQ list component filter client-side Add DLQ_COMPONENTS and reject an unknown `component` in list() with a ValueError before the request, instead of relying only on the server's 422. Verified against the live staging EE API (glassflow-etl-ee#35): valid components accepted, unknown rejected, and reprocess on a stopped pipeline returns 409 -> PipelineNotRunningError as mapped. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/glassflow/ee/dlq.py | 17 ++++++++++++----- tests/test_ee_dlq.py | 15 ++++++++++++--- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/glassflow/ee/dlq.py b/src/glassflow/ee/dlq.py index 9110a06..b6aa3a3 100644 --- a/src/glassflow/ee/dlq.py +++ b/src/glassflow/ee/dlq.py @@ -11,6 +11,10 @@ # fast, offline error instead of a round-trip 400. MAX_SELECTED_MESSAGE_IDS = 1000 +# Data-plane components a DLQ message can come from (DataPlaneRoles in +# glassflow-etl-ee). Used to validate the list() component filter client-side. +DLQ_COMPONENTS = ("ingestor", "join", "sink", "dedup", "oltp-receiver") + class DLQ(_OSSDLQ): """Enterprise Dead Letter Queue client. @@ -42,9 +46,9 @@ def list( batch_size: Number of messages per page (between 1 and 1000). cursor: NATS sequence to resume from, taken from the previous page's ``next_cursor``; omit for the first page. - component: Filter to messages from a single data-plane component, - one of ``ingestor``, ``join``, ``sink``, ``dedup``, - ``oltp-receiver``; omit for all components. + component: Filter to messages from a single data-plane component; + must be one of :data:`DLQ_COMPONENTS` (``ingestor``, ``join``, + ``sink``, ``dedup``, ``oltp-receiver``); omit for all components. Returns: A dict with ``messages`` (list of message dicts), ``has_more`` @@ -52,8 +56,8 @@ def list( true; pass it back as ``cursor`` to fetch the next page). Raises: - ValueError: If ``batch_size`` is invalid. - APIError: If the API request fails (e.g. an unknown ``component``). + ValueError: If ``batch_size`` or ``component`` is invalid. + APIError: If the API request fails. """ if ( not isinstance(batch_size, int) @@ -65,6 +69,9 @@ def list( f"{MAX_SELECTED_MESSAGE_IDS}" ) + if component is not None and component not in DLQ_COMPONENTS: + raise ValueError(f"component must be one of {', '.join(DLQ_COMPONENTS)}") + params: Dict[str, Any] = {"batch_size": batch_size} if cursor is not None: params["cursor"] = cursor diff --git a/tests/test_ee_dlq.py b/tests/test_ee_dlq.py index 57c3efe..8d53dc6 100644 --- a/tests/test_ee_dlq.py +++ b/tests/test_ee_dlq.py @@ -76,15 +76,24 @@ def test_list_with_cursor(self, ee_dlq, mock_success): params={"batch_size": 10, "cursor": "seq_200"}, ) - def test_list_with_component_filter(self, ee_dlq, mock_success): + @pytest.mark.parametrize( + "component", ["ingestor", "join", "sink", "dedup", "oltp-receiver"] + ) + def test_list_with_valid_component_filter(self, ee_dlq, mock_success, component): with mock_success(json_payloads=[{"messages": [], "has_more": False}]) as m: - ee_dlq.list(batch_size=10, component="sink") + ee_dlq.list(batch_size=10, component=component) m.assert_called_once_with( "GET", f"{ee_dlq.endpoint}/list", - params={"batch_size": 10, "component": "sink"}, + params={"batch_size": 10, "component": component}, ) + @pytest.mark.parametrize("bad", ["otlp-receiver", "Sink", "", "transform"]) + def test_list_invalid_component_raises_client_side(self, ee_dlq, bad): + # Validated client-side before any HTTP call (no mock needed). + with pytest.raises(ValueError, match="component must be one of"): + ee_dlq.list(component=bad) + def test_list_empty_on_204(self, ee_dlq): mock_response = mock_responses.create_mock_response_factory()( status_code=204, json_data=None