diff --git a/README.md b/README.md index 2b3bea5..18a7f94 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ A Python SDK for creating and managing data pipelines between Kafka and ClickHou - Pipeline configuration via YAML or JSON - Schema validation and configuration management - Fine-grained resource control per pipeline component +- Enterprise Edition client (`glassflow.ee`) with DLQ reprocessing and discard ## Installation @@ -157,6 +158,44 @@ client.delete_pipeline("my-pipeline-id") pipeline.delete() ``` +## Enterprise Edition + +The GlassFlow Enterprise Edition adds capabilities on top of the Open Source engine. The SDK exposes them through a drop-in client that extends the Open Source one. Import `Client` from `glassflow.ee` instead of `glassflow.etl`: + +```python +from glassflow.ee import Client + +client = Client(host="your-glassflow-etl-url") +``` + +The Enterprise client does everything the Open Source client does, plus the Enterprise-only features below. Entitlement is enforced by the backend: calling an Enterprise-only operation against a backend that is not licensed for it raises `FeatureNotLicensedError`. + +### DLQ message processing + +When a pipeline component fails to process a message, that message lands in the pipeline's dead-letter queue (DLQ). On the Enterprise client, `pipeline.dlq` adds message management on top of the Open Source `state`, `consume`, and `purge`: + +- `list(batch_size, cursor, component)`: non-destructive paginated read. Returns a page dict with `messages` (each carrying a stable `message_id`, `component`, `error`, `original_message`, and `received_at`), `has_more`, and `next_cursor`. Pass `component` to filter to a single component (`ingestor`, `join`, `sink`, `dedup`, `oltp-receiver`), and pass `next_cursor` back as `cursor` to page. +- `list_iter(batch_size, component)`: lazily iterate over every message, paging via the cursor for you. Yields individual messages, so you do not manage the cursor by hand. +- `reprocess(message_ids)` / `reprocess_all()`: move messages back into the pipeline input to be processed again. +- `discard(message_ids)` / `discard_all()`: permanently remove messages. + +```python +pipeline = client.get_pipeline("my-pipeline-id") + +# Inspect failed messages from the sink only (paged automatically) +ids = [m["message_id"] for m in pipeline.dlq.list_iter(component="sink")] + +# Retry them after fixing the underlying issue +pipeline.dlq.reprocess(ids) # or pipeline.dlq.reprocess_all() + +# Or drop the ones you do not want +pipeline.dlq.discard(["seq_200"]) # or pipeline.dlq.discard_all() +``` + +Reprocessing replays messages through the running pipeline, so the pipeline must be in the `Running` state. Calling `reprocess` on a stopped, terminated, or failed pipeline raises `PipelineNotRunningError`. Discard acts on the queue directly and works in any state. + +`reprocess` and `discard` accept at most 1000 `message_id` values per call. For larger sets, use the `*_all` variants. See the [DLQ documentation](https://docs.glassflow.dev/configuration/dlq) for the full reference. + ## Migrating from V2 to V3 Pipeline version `v2` has been removed. Use `Client.migrate_pipeline_v2_to_v3()` to convert an existing configuration automatically: diff --git a/src/glassflow/ee/__init__.py b/src/glassflow/ee/__init__.py index e54c555..0aff448 100644 --- a/src/glassflow/ee/__init__.py +++ b/src/glassflow/ee/__init__.py @@ -8,6 +8,7 @@ client = Client(host="https://...") pipeline = client.get_pipeline("my-pipeline") + pipeline.dlq.reprocess_all() # Enterprise DLQ management All open-source models are re-exported from :mod:`glassflow.etl` for convenience, so a single import path covers both tiers. @@ -21,11 +22,13 @@ ) from .client import Client +from .dlq import DLQ from .pipeline import Pipeline __all__ = [ "Pipeline", "Client", + "DLQ", "PipelineConfig", "SourceConfig", "SinkConfig", diff --git a/src/glassflow/ee/dlq.py b/src/glassflow/ee/dlq.py new file mode 100644 index 0000000..b6aa3a3 --- /dev/null +++ b/src/glassflow/ee/dlq.py @@ -0,0 +1,234 @@ +from __future__ import annotations + +import warnings +from typing import Any, Dict, Iterator, List, Optional + +from glassflow.etl import errors +from glassflow.etl.dlq import DLQ as _OSSDLQ + +# Mirrors the backend cap on message_ids per mode=selected request +# (dlqSelectedMaxMessageIDs in glassflow-etl-ee). Validated client-side for a +# fast, offline error instead of a round-trip 400. +MAX_SELECTED_MESSAGE_IDS = 1000 + +# Data-plane components a DLQ message can come from (DataPlaneRoles in +# glassflow-etl-ee). Used to validate the list() component filter client-side. +DLQ_COMPONENTS = ("ingestor", "join", "sink", "dedup", "oltp-receiver") + + +class DLQ(_OSSDLQ): + """Enterprise Dead Letter Queue client. + + Extends the open-source :class:`glassflow.etl.dlq.DLQ` with message + management: a non-destructive paginated :meth:`list`, and + :meth:`reprocess`/:meth:`discard` (plus their ``*_all`` variants) that move + messages back into the pipeline or permanently remove them. + + Backend entitlement is enforced server-side; calling these against a backend + that is not licensed for them raises :class:`FeatureNotLicensedError`. + """ + + def list( + self, + batch_size: int = 100, + cursor: Optional[str] = None, + component: Optional[str] = None, + ) -> Dict[str, Any]: + """Read messages from the DLQ without removing them. + + The non-destructive successor to :meth:`consume`. Each message in the + returned page carries a stable ``message_id`` (NATS sequence number, as + a string) plus ``component``, ``error``, ``original_message`` and + ``received_at``. The ``message_id`` values are what :meth:`reprocess` + and :meth:`discard` act on in ``mode=selected``. + + Args: + batch_size: Number of messages per page (between 1 and 1000). + cursor: NATS sequence to resume from, taken from the previous page's + ``next_cursor``; omit for the first page. + component: Filter to messages from a single data-plane component; + must be one of :data:`DLQ_COMPONENTS` (``ingestor``, ``join``, + ``sink``, ``dedup``, ``oltp-receiver``); omit for all components. + + Returns: + A dict with ``messages`` (list of message dicts), ``has_more`` + (bool), and ``next_cursor`` (str, present when ``has_more`` is + true; pass it back as ``cursor`` to fetch the next page). + + Raises: + ValueError: If ``batch_size`` or ``component`` is invalid. + APIError: If the API request fails. + """ + if ( + not isinstance(batch_size, int) + or batch_size < 1 + or batch_size > MAX_SELECTED_MESSAGE_IDS + ): + raise ValueError( + f"batch_size must be an integer between 1 and " + f"{MAX_SELECTED_MESSAGE_IDS}" + ) + + if component is not None and component not in DLQ_COMPONENTS: + raise ValueError(f"component must be one of {', '.join(DLQ_COMPONENTS)}") + + params: Dict[str, Any] = {"batch_size": batch_size} + if cursor is not None: + params["cursor"] = cursor + if component is not None: + params["component"] = component + + response = self._request("GET", f"{self.endpoint}/list", params=params) + if response.status_code == 204 or not response.content: + return {"messages": [], "has_more": False} + return response.json() + + def list_iter( + self, + batch_size: int = 100, + component: Optional[str] = None, + cursor: Optional[str] = None, + ) -> Iterator[Dict[str, Any]]: + """Lazily iterate over every DLQ message, paging via the cursor for you. + + The streaming companion to :meth:`list`: it calls :meth:`list` page by + page and yields each message, so callers do not manage the cursor + themselves. Memory stays flat (one page at a time) and it composes with + ``itertools`` (e.g. ``itertools.islice`` for the first N). + + Args: + batch_size: Messages fetched per underlying page request (1-1000). + component: Optional component filter; see :meth:`list`. + cursor: Optional starting cursor (e.g. to resume a previous run); + omit to start at the beginning of the DLQ. + + Yields: + Individual DLQ message dicts. + """ + while True: + page = self.list(batch_size=batch_size, cursor=cursor, component=component) + yield from page.get("messages", []) + if not page.get("has_more"): + return + cursor = page.get("next_cursor") + # Defensive: a truthy has_more without a cursor would loop forever. + if not cursor: + return + + def reprocess(self, message_ids: List[str]) -> Dict[str, Any]: + """Move specific messages from the DLQ back into the pipeline input. + + Args: + message_ids: ``message_id`` values (from :meth:`list`) to reprocess; + must be non-empty and at most ``MAX_SELECTED_MESSAGE_IDS``. + + Returns: + Dict with ``request_id`` and ``status`` ("accepted"). The republish + happens asynchronously. + + Raises: + ValueError: If ``message_ids`` is empty or too large. + FeatureNotLicensedError: If the backend is not licensed for this. + PipelineNotRunningError: If the pipeline is not in the Running state. + APIError: If the API request fails. + """ + return self._action("reprocess", "selected", message_ids) + + def reprocess_all(self) -> Dict[str, Any]: + """Reprocess every message currently in the DLQ (up to the head at + request time). See :meth:`reprocess`.""" + return self._action("reprocess", "all", None) + + def discard(self, message_ids: List[str]) -> Dict[str, Any]: + """Permanently remove specific messages from the DLQ without + reprocessing them. + + Args: + message_ids: ``message_id`` values (from :meth:`list`) to discard; + must be non-empty and at most ``MAX_SELECTED_MESSAGE_IDS``. + + Returns: + Dict with ``request_id`` and ``discarded_count``. + + Raises: + ValueError: If ``message_ids`` is empty or too large. + FeatureNotLicensedError: If the backend is not licensed for this. + APIError: If the API request fails. + """ + return self._action("discard", "selected", message_ids) + + def discard_all(self) -> Dict[str, Any]: + """Permanently remove every message currently in the DLQ. See + :meth:`discard`.""" + return self._action("discard", "all", None) + + # Deprecated, inherited operations ------------------------------------- + + def consume(self, batch_size: int = 100) -> List[Dict[str, Any]]: + """Deprecated: use :meth:`list`. + + The ``/dlq/consume`` endpoint is being removed in favour of the + non-destructive ``/dlq/list``. This override delegates to :meth:`list` + so existing callers keep working through the transition. + """ + warnings.warn( + "DLQ.consume() is deprecated and the /dlq/consume endpoint is being " + "removed; use DLQ.list() (non-destructive).", + DeprecationWarning, + stacklevel=2, + ) + return self.list(batch_size=batch_size).get("messages", []) + + def purge(self) -> None: + """Deprecated: use :meth:`discard_all`. + + ``/dlq/purge`` remains for backward compatibility but is superseded by + :meth:`discard_all`. + """ + warnings.warn( + "DLQ.purge() is deprecated; use DLQ.discard_all().", + DeprecationWarning, + stacklevel=2, + ) + super().purge() + + def _action( + self, action: str, mode: str, message_ids: Optional[List[str]] + ) -> Dict[str, Any]: + body: Dict[str, Any] = {"mode": mode} + if mode == "selected": + if not message_ids: + raise ValueError( + "message_ids must be non-empty when selecting messages" + ) + if len(message_ids) > MAX_SELECTED_MESSAGE_IDS: + raise ValueError( + f"message_ids cannot exceed {MAX_SELECTED_MESSAGE_IDS} entries" + ) + body["message_ids"] = message_ids + + try: + response = self._request("POST", f"{self.endpoint}/{action}", json=body) + if response.status_code == 204 or not response.content: + return {} + return response.json() + except errors.ForbiddenError as e: + raise errors.FeatureNotLicensedError( + status_code=e.status_code, + message=(f"DLQ {action} requires a GlassFlow Enterprise license"), + response=e.response, + ) from e + except errors.ConflictError as e: + # Reprocess replays through the running pipeline, so the API rejects + # it with 409 when the pipeline is not in the Running state. Discard + # acts on the queue directly and has no such constraint. + if action == "reprocess": + raise errors.PipelineNotRunningError( + status_code=e.status_code, + message=( + "Pipeline must be in the Running state to reprocess DLQ " + "messages" + ), + response=e.response, + ) from e + raise diff --git a/src/glassflow/ee/pipeline.py b/src/glassflow/ee/pipeline.py index f9feec6..7dd57fd 100644 --- a/src/glassflow/ee/pipeline.py +++ b/src/glassflow/ee/pipeline.py @@ -2,11 +2,25 @@ from glassflow.etl.pipeline import Pipeline as _OSSPipeline +from .dlq import DLQ + class Pipeline(_OSSPipeline): """Enterprise Pipeline. - Extends the open-source :class:`glassflow.etl.pipeline.Pipeline`. Currently - a pass-through; Enterprise-only pipeline capabilities are added here in - follow-up work. + Extends the open-source :class:`glassflow.etl.pipeline.Pipeline`. Its ``dlq`` + property exposes the Enterprise :class:`~.dlq.DLQ` (with + ``list``/``reprocess``/``discard``). Construction is inherited unchanged; + only the DLQ collaborator class is swapped via ``_dlq_class``. """ + + _dlq_class = DLQ + + @property + def dlq(self) -> DLQ: + """Get the Enterprise DLQ client for this pipeline.""" + return self._dlq + + @dlq.setter + def dlq(self, dlq: DLQ) -> None: + self._dlq = dlq diff --git a/src/glassflow/etl/api_client.py b/src/glassflow/etl/api_client.py index 221f6ce..701a874 100644 --- a/src/glassflow/etl/api_client.py +++ b/src/glassflow/etl/api_client.py @@ -113,6 +113,8 @@ def _raise_api_error(response: httpx.Response) -> None: raise errors.ForbiddenError(status_code, message, response=response) elif status_code == 404: raise errors.NotFoundError(status_code, message, response=response) + elif status_code == 409: + raise errors.ConflictError(status_code, message, response=response) elif status_code == 422: raise errors.UnprocessableContentError( status_code, message, response=response diff --git a/src/glassflow/etl/errors.py b/src/glassflow/etl/errors.py index b3651f7..b903440 100644 --- a/src/glassflow/etl/errors.py +++ b/src/glassflow/etl/errors.py @@ -34,6 +34,23 @@ class ForbiddenError(APIError): """Raised on 403 Forbidden errors.""" +class FeatureNotLicensedError(ForbiddenError): + """Raised when an Enterprise-only capability is invoked against a backend + that is not licensed for it (the API responds 403). Subclasses + ForbiddenError so existing 403 handling still catches it.""" + + +class ConflictError(APIError): + """Raised on 409 Conflict errors.""" + + +class PipelineNotRunningError(ConflictError): + """Raised when an operation requires a Running pipeline but the pipeline is + in another state (the API responds 409). For example, DLQ reprocessing + replays messages through the running pipeline and is rejected when the + pipeline is stopped, terminated, or failed.""" + + class UnprocessableContentError(APIError): """Raised on 422 Unprocessable Content errors.""" diff --git a/tests/test_ee_dlq.py b/tests/test_ee_dlq.py new file mode 100644 index 0000000..8d53dc6 --- /dev/null +++ b/tests/test_ee_dlq.py @@ -0,0 +1,279 @@ +"""Tests for the Enterprise DLQ: list / reprocess / discard.""" + +from unittest.mock import patch + +import pytest + +from glassflow import ee +from glassflow.etl import errors +from glassflow.etl.dlq import DLQ as OSSDLQ +from tests.data import mock_responses + + +@pytest.fixture +def ee_dlq(): + return ee.DLQ(host="http://localhost:8080", pipeline_id="test-pipeline") + + +@pytest.fixture +def ee_pipeline(valid_config): + config = ee.PipelineConfig(**valid_config) + return ee.Pipeline(host="http://localhost:8080", config=config) + + +class TestEEDLQWiring: + def test_ee_dlq_subclasses_oss(self): + assert issubclass(ee.DLQ, OSSDLQ) + + def test_pipeline_exposes_ee_dlq(self, ee_pipeline): + assert isinstance(ee_pipeline.dlq, ee.DLQ) + + def test_dlq_setter_still_works(self, ee_pipeline, ee_dlq): + ee_pipeline.dlq = ee_dlq + assert ee_pipeline.dlq is ee_dlq + + def test_get_pipeline_dlq_is_ee( + self, mock_success, get_pipeline_response, get_health_payload + ): + client = ee.Client(host="http://localhost:8080") + with mock_success( + [get_pipeline_response, get_health_payload("test-pipeline-id")] + ): + pipeline = client.get_pipeline("test-pipeline-id") + assert isinstance(pipeline.dlq, ee.DLQ) + + +class TestList: + def test_list_success(self, ee_dlq, mock_success): + payload = { + "messages": [ + { + "message_id": "seq_101", + "component": "sink", + "error": "connection refused", + "original_message": "{}", + "received_at": "2026-05-29T14:00:00Z", + } + ], + "next_cursor": "seq_101", + "has_more": True, + } + with mock_success(json_payloads=[payload]) as mock_get: + result = ee_dlq.list(batch_size=50) + + mock_get.assert_called_once_with( + "GET", f"{ee_dlq.endpoint}/list", params={"batch_size": 50} + ) + assert result == payload + assert result["messages"][0]["message_id"] == "seq_101" + + def test_list_with_cursor(self, ee_dlq, mock_success): + with mock_success(json_payloads=[{"messages": [], "has_more": False}]) as m: + ee_dlq.list(batch_size=10, cursor="seq_200") + m.assert_called_once_with( + "GET", + f"{ee_dlq.endpoint}/list", + params={"batch_size": 10, "cursor": "seq_200"}, + ) + + @pytest.mark.parametrize( + "component", ["ingestor", "join", "sink", "dedup", "oltp-receiver"] + ) + def test_list_with_valid_component_filter(self, ee_dlq, mock_success, component): + with mock_success(json_payloads=[{"messages": [], "has_more": False}]) as m: + ee_dlq.list(batch_size=10, component=component) + m.assert_called_once_with( + "GET", + f"{ee_dlq.endpoint}/list", + params={"batch_size": 10, "component": component}, + ) + + @pytest.mark.parametrize("bad", ["otlp-receiver", "Sink", "", "transform"]) + def test_list_invalid_component_raises_client_side(self, ee_dlq, bad): + # Validated client-side before any HTTP call (no mock needed). + with pytest.raises(ValueError, match="component must be one of"): + ee_dlq.list(component=bad) + + def test_list_empty_on_204(self, ee_dlq): + mock_response = mock_responses.create_mock_response_factory()( + status_code=204, json_data=None + ) + with patch("httpx.Client.request", return_value=mock_response): + assert ee_dlq.list() == {"messages": [], "has_more": False} + + @pytest.mark.parametrize("bad", [0, 1001, -1, "10"]) + def test_list_invalid_batch_size(self, ee_dlq, bad): + with pytest.raises(ValueError, match="batch_size must be an integer"): + ee_dlq.list(batch_size=bad) + + +class TestListIter: + def test_pages_through_and_advances_cursor(self, ee_dlq, mock_success): + page1 = { + "messages": [{"message_id": "1"}, {"message_id": "2"}], + "next_cursor": "2", + "has_more": True, + } + page2 = {"messages": [{"message_id": "3"}], "has_more": False} + with mock_success(json_payloads=[page1, page2]) as mock_get: + ids = [m["message_id"] for m in ee_dlq.list_iter(batch_size=2)] + + assert ids == ["1", "2", "3"] + assert mock_get.call_count == 2 + # Second page resumes from the first page's next_cursor. + assert mock_get.call_args_list[1].kwargs["params"] == { + "batch_size": 2, + "cursor": "2", + } + + def test_forwards_component_and_is_lazy(self, ee_dlq, mock_success): + page = {"messages": [{"message_id": "1"}], "has_more": False} + with mock_success(json_payloads=[page]) as mock_get: + it = ee_dlq.list_iter(component="sink") + # Generator is lazy: no request until first iteration. + assert mock_get.call_count == 0 + next(it) + assert mock_get.call_args_list[0].kwargs["params"] == { + "batch_size": 100, + "component": "sink", + } + + def test_stops_when_has_more_without_cursor(self, ee_dlq, mock_success): + # Defensive guard: truthy has_more but no next_cursor must not loop. + page = {"messages": [{"message_id": "1"}], "has_more": True} + with mock_success(json_payloads=[page]) as mock_get: + ids = [m["message_id"] for m in ee_dlq.list_iter()] + assert ids == ["1"] + assert mock_get.call_count == 1 + + +class TestReprocess: + def test_reprocess_selected(self, ee_dlq, mock_success): + with mock_success( + json_payloads=[{"request_id": "rep_1", "status": "accepted"}] + ) as mock_post: + result = ee_dlq.reprocess(["seq_101", "seq_102"]) + + mock_post.assert_called_once_with( + "POST", + f"{ee_dlq.endpoint}/reprocess", + json={"mode": "selected", "message_ids": ["seq_101", "seq_102"]}, + ) + assert result == {"request_id": "rep_1", "status": "accepted"} + + def test_reprocess_all(self, ee_dlq, mock_success): + with mock_success( + json_payloads=[{"request_id": "rep_2", "status": "accepted"}] + ) as mock_post: + ee_dlq.reprocess_all() + mock_post.assert_called_once_with( + "POST", f"{ee_dlq.endpoint}/reprocess", json={"mode": "all"} + ) + + def test_reprocess_empty_ids_raises(self, ee_dlq): + with pytest.raises(ValueError, match="must be non-empty"): + ee_dlq.reprocess([]) + + def test_reprocess_too_many_ids_raises(self, ee_dlq): + with pytest.raises(ValueError, match="cannot exceed 1000"): + ee_dlq.reprocess([str(i) for i in range(1001)]) + + +class TestDiscard: + def test_discard_selected(self, ee_dlq, mock_success): + with mock_success( + json_payloads=[{"request_id": "dis_1", "discarded_count": 2}] + ) as mock_post: + result = ee_dlq.discard(["seq_1", "seq_2"]) + + mock_post.assert_called_once_with( + "POST", + f"{ee_dlq.endpoint}/discard", + json={"mode": "selected", "message_ids": ["seq_1", "seq_2"]}, + ) + assert result == {"request_id": "dis_1", "discarded_count": 2} + + def test_discard_all(self, ee_dlq, mock_success): + with mock_success( + json_payloads=[{"request_id": "dis_2", "discarded_count": 9}] + ) as mock_post: + ee_dlq.discard_all() + mock_post.assert_called_once_with( + "POST", f"{ee_dlq.endpoint}/discard", json={"mode": "all"} + ) + + def test_discard_empty_ids_raises(self, ee_dlq): + with pytest.raises(ValueError, match="must be non-empty"): + ee_dlq.discard([]) + + +class TestDeprecatedInherited: + def test_consume_warns_and_delegates_to_list(self, ee_dlq, mock_success): + envelope = {"messages": [{"message_id": "seq_1"}], "has_more": False} + with mock_success(json_payloads=[envelope]) as mock_get: + with pytest.warns(DeprecationWarning, match="use DLQ.list"): + result = ee_dlq.consume(batch_size=25) + + # Hits /dlq/list, not /dlq/consume. + mock_get.assert_called_once_with( + "GET", f"{ee_dlq.endpoint}/list", params={"batch_size": 25} + ) + # consume() unwraps the envelope to the legacy list shape. + assert result == [{"message_id": "seq_1"}] + + def test_purge_warns_and_hits_purge_endpoint(self, ee_dlq, mock_success): + with mock_success() as mock_post: + with pytest.warns(DeprecationWarning, match="use DLQ.discard_all"): + ee_dlq.purge() + + mock_post.assert_called_once_with("POST", f"{ee_dlq.endpoint}/purge") + + +class TestEntitlement: + def test_forbidden_maps_to_feature_not_licensed(self, ee_dlq): + mock_response = mock_responses.create_mock_response_factory()( + status_code=403, json_data={"message": "Forbidden"} + ) + with patch( + "httpx.Client.request", + side_effect=mock_response.raise_for_status.side_effect, + ): + with pytest.raises(errors.FeatureNotLicensedError) as exc_info: + ee_dlq.reprocess_all() + + assert "Enterprise" in str(exc_info.value) + # Still catchable as a ForbiddenError by existing 403 handling. + assert isinstance(exc_info.value, errors.ForbiddenError) + + +class TestPipelineState: + def _conflict_patch(self): + mock_response = mock_responses.create_mock_response_factory()( + status_code=409, json_data={"message": "pipeline is not running"} + ) + return patch( + "httpx.Client.request", + side_effect=mock_response.raise_for_status.side_effect, + ) + + def test_reprocess_on_non_running_raises_pipeline_not_running(self, ee_dlq): + with self._conflict_patch(): + with pytest.raises(errors.PipelineNotRunningError) as exc_info: + ee_dlq.reprocess(["seq_1"]) + + assert "Running" in str(exc_info.value) + # Still catchable as the generic 409 ConflictError. + assert isinstance(exc_info.value, errors.ConflictError) + + def test_reprocess_all_on_non_running_raises_pipeline_not_running(self, ee_dlq): + with self._conflict_patch(): + with pytest.raises(errors.PipelineNotRunningError): + ee_dlq.reprocess_all() + + def test_discard_409_stays_conflict_error(self, ee_dlq): + # Discard has no Running-state constraint, so a 409 is not remapped. + with self._conflict_patch(): + with pytest.raises(errors.ConflictError) as exc_info: + ee_dlq.discard_all() + + assert not isinstance(exc_info.value, errors.PipelineNotRunningError)