Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ A Python SDK for creating and managing data pipelines between Kafka and ClickHou
- Pipeline configuration via YAML or JSON
- Schema validation and configuration management
- Fine-grained resource control per pipeline component
- Enterprise Edition client (`glassflow.ee`) with DLQ reprocessing and discard

## Installation

Expand Down Expand Up @@ -157,6 +158,44 @@ client.delete_pipeline("my-pipeline-id")
pipeline.delete()
```

## Enterprise Edition

The GlassFlow Enterprise Edition adds capabilities on top of the Open Source engine. The SDK exposes them through a drop-in client that extends the Open Source one. Import `Client` from `glassflow.ee` instead of `glassflow.etl`:

```python
from glassflow.ee import Client

client = Client(host="your-glassflow-etl-url")
```

The Enterprise client does everything the Open Source client does, plus the Enterprise-only features below. Entitlement is enforced by the backend: calling an Enterprise-only operation against a backend that is not licensed for it raises `FeatureNotLicensedError`.

### DLQ message processing

When a pipeline component fails to process a message, that message lands in the pipeline's dead-letter queue (DLQ). On the Enterprise client, `pipeline.dlq` adds message management on top of the Open Source `state`, `consume`, and `purge`:

- `list(batch_size, cursor, component)`: non-destructive paginated read. Returns a page dict with `messages` (each carrying a stable `message_id`, `component`, `error`, `original_message`, and `received_at`), `has_more`, and `next_cursor`. Pass `component` to filter to a single component (`ingestor`, `join`, `sink`, `dedup`, `oltp-receiver`), and pass `next_cursor` back as `cursor` to page.
- `list_iter(batch_size, component)`: lazily iterate over every message, paging via the cursor for you. Yields individual messages, so you do not manage the cursor by hand.
- `reprocess(message_ids)` / `reprocess_all()`: move messages back into the pipeline input to be processed again.
- `discard(message_ids)` / `discard_all()`: permanently remove messages.

```python
pipeline = client.get_pipeline("my-pipeline-id")

# Inspect failed messages from the sink only (paged automatically)
ids = [m["message_id"] for m in pipeline.dlq.list_iter(component="sink")]

# Retry them after fixing the underlying issue
pipeline.dlq.reprocess(ids) # or pipeline.dlq.reprocess_all()

# Or drop the ones you do not want
pipeline.dlq.discard(["seq_200"]) # or pipeline.dlq.discard_all()
```

Reprocessing replays messages through the running pipeline, so the pipeline must be in the `Running` state. Calling `reprocess` on a stopped, terminated, or failed pipeline raises `PipelineNotRunningError`. Discard acts on the queue directly and works in any state.

`reprocess` and `discard` accept at most 1000 `message_id` values per call. For larger sets, use the `*_all` variants. See the [DLQ documentation](https://docs.glassflow.dev/configuration/dlq) for the full reference.

## Migrating from V2 to V3

Pipeline version `v2` has been removed. Use `Client.migrate_pipeline_v2_to_v3()` to convert an existing configuration automatically:
Expand Down
3 changes: 3 additions & 0 deletions src/glassflow/ee/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

client = Client(host="https://...")
pipeline = client.get_pipeline("my-pipeline")
pipeline.dlq.reprocess_all() # Enterprise DLQ management

All open-source models are re-exported from :mod:`glassflow.etl` for
convenience, so a single import path covers both tiers.
Expand All @@ -21,11 +22,13 @@
)

from .client import Client
from .dlq import DLQ
from .pipeline import Pipeline

__all__ = [
"Pipeline",
"Client",
"DLQ",
"PipelineConfig",
"SourceConfig",
"SinkConfig",
Expand Down
234 changes: 234 additions & 0 deletions src/glassflow/ee/dlq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
from __future__ import annotations

import warnings
from typing import Any, Dict, Iterator, List, Optional

from glassflow.etl import errors
from glassflow.etl.dlq import DLQ as _OSSDLQ

# Mirrors the backend cap on message_ids per mode=selected request
# (dlqSelectedMaxMessageIDs in glassflow-etl-ee). Validated client-side for a
# fast, offline error instead of a round-trip 400.
MAX_SELECTED_MESSAGE_IDS = 1000

# Data-plane components a DLQ message can come from (DataPlaneRoles in
# glassflow-etl-ee). Used to validate the list() component filter client-side.
DLQ_COMPONENTS = ("ingestor", "join", "sink", "dedup", "oltp-receiver")


class DLQ(_OSSDLQ):
"""Enterprise Dead Letter Queue client.

Extends the open-source :class:`glassflow.etl.dlq.DLQ` with message
management: a non-destructive paginated :meth:`list`, and
:meth:`reprocess`/:meth:`discard` (plus their ``*_all`` variants) that move
messages back into the pipeline or permanently remove them.

Backend entitlement is enforced server-side; calling these against a backend
that is not licensed for them raises :class:`FeatureNotLicensedError`.
"""

def list(
self,
batch_size: int = 100,
cursor: Optional[str] = None,
component: Optional[str] = None,
) -> Dict[str, Any]:
"""Read messages from the DLQ without removing them.

The non-destructive successor to :meth:`consume`. Each message in the
returned page carries a stable ``message_id`` (NATS sequence number, as
a string) plus ``component``, ``error``, ``original_message`` and
``received_at``. The ``message_id`` values are what :meth:`reprocess`
and :meth:`discard` act on in ``mode=selected``.

Args:
batch_size: Number of messages per page (between 1 and 1000).
cursor: NATS sequence to resume from, taken from the previous page's
``next_cursor``; omit for the first page.
component: Filter to messages from a single data-plane component;
must be one of :data:`DLQ_COMPONENTS` (``ingestor``, ``join``,
``sink``, ``dedup``, ``oltp-receiver``); omit for all components.

Returns:
A dict with ``messages`` (list of message dicts), ``has_more``
(bool), and ``next_cursor`` (str, present when ``has_more`` is
true; pass it back as ``cursor`` to fetch the next page).

Raises:
ValueError: If ``batch_size`` or ``component`` is invalid.
APIError: If the API request fails.
"""
if (
not isinstance(batch_size, int)
or batch_size < 1
or batch_size > MAX_SELECTED_MESSAGE_IDS
):
raise ValueError(
f"batch_size must be an integer between 1 and "
f"{MAX_SELECTED_MESSAGE_IDS}"
)

if component is not None and component not in DLQ_COMPONENTS:
raise ValueError(f"component must be one of {', '.join(DLQ_COMPONENTS)}")

params: Dict[str, Any] = {"batch_size": batch_size}
if cursor is not None:
params["cursor"] = cursor
if component is not None:
params["component"] = component

response = self._request("GET", f"{self.endpoint}/list", params=params)
if response.status_code == 204 or not response.content:
return {"messages": [], "has_more": False}
return response.json()

def list_iter(
self,
batch_size: int = 100,
component: Optional[str] = None,
cursor: Optional[str] = None,
) -> Iterator[Dict[str, Any]]:
"""Lazily iterate over every DLQ message, paging via the cursor for you.

The streaming companion to :meth:`list`: it calls :meth:`list` page by
page and yields each message, so callers do not manage the cursor
themselves. Memory stays flat (one page at a time) and it composes with
``itertools`` (e.g. ``itertools.islice`` for the first N).

Args:
batch_size: Messages fetched per underlying page request (1-1000).
component: Optional component filter; see :meth:`list`.
cursor: Optional starting cursor (e.g. to resume a previous run);
omit to start at the beginning of the DLQ.

Yields:
Individual DLQ message dicts.
"""
while True:
page = self.list(batch_size=batch_size, cursor=cursor, component=component)
yield from page.get("messages", [])
if not page.get("has_more"):
return
cursor = page.get("next_cursor")
# Defensive: a truthy has_more without a cursor would loop forever.
if not cursor:
return

def reprocess(self, message_ids: List[str]) -> Dict[str, Any]:
"""Move specific messages from the DLQ back into the pipeline input.

Args:
message_ids: ``message_id`` values (from :meth:`list`) to reprocess;
must be non-empty and at most ``MAX_SELECTED_MESSAGE_IDS``.

Returns:
Dict with ``request_id`` and ``status`` ("accepted"). The republish
happens asynchronously.

Raises:
ValueError: If ``message_ids`` is empty or too large.
FeatureNotLicensedError: If the backend is not licensed for this.
PipelineNotRunningError: If the pipeline is not in the Running state.
APIError: If the API request fails.
"""
return self._action("reprocess", "selected", message_ids)

def reprocess_all(self) -> Dict[str, Any]:
"""Reprocess every message currently in the DLQ (up to the head at
request time). See :meth:`reprocess`."""
return self._action("reprocess", "all", None)

def discard(self, message_ids: List[str]) -> Dict[str, Any]:
"""Permanently remove specific messages from the DLQ without
reprocessing them.

Args:
message_ids: ``message_id`` values (from :meth:`list`) to discard;
must be non-empty and at most ``MAX_SELECTED_MESSAGE_IDS``.

Returns:
Dict with ``request_id`` and ``discarded_count``.

Raises:
ValueError: If ``message_ids`` is empty or too large.
FeatureNotLicensedError: If the backend is not licensed for this.
APIError: If the API request fails.
"""
return self._action("discard", "selected", message_ids)

def discard_all(self) -> Dict[str, Any]:
"""Permanently remove every message currently in the DLQ. See
:meth:`discard`."""
return self._action("discard", "all", None)

# Deprecated, inherited operations -------------------------------------

def consume(self, batch_size: int = 100) -> List[Dict[str, Any]]:
"""Deprecated: use :meth:`list`.

The ``/dlq/consume`` endpoint is being removed in favour of the
non-destructive ``/dlq/list``. This override delegates to :meth:`list`
so existing callers keep working through the transition.
"""
warnings.warn(
"DLQ.consume() is deprecated and the /dlq/consume endpoint is being "
"removed; use DLQ.list() (non-destructive).",
DeprecationWarning,
stacklevel=2,
)
return self.list(batch_size=batch_size).get("messages", [])

def purge(self) -> None:
"""Deprecated: use :meth:`discard_all`.

``/dlq/purge`` remains for backward compatibility but is superseded by
:meth:`discard_all`.
"""
warnings.warn(
"DLQ.purge() is deprecated; use DLQ.discard_all().",
DeprecationWarning,
stacklevel=2,
)
super().purge()

def _action(
self, action: str, mode: str, message_ids: Optional[List[str]]
) -> Dict[str, Any]:
body: Dict[str, Any] = {"mode": mode}
if mode == "selected":
if not message_ids:
raise ValueError(
"message_ids must be non-empty when selecting messages"
)
if len(message_ids) > MAX_SELECTED_MESSAGE_IDS:
raise ValueError(
f"message_ids cannot exceed {MAX_SELECTED_MESSAGE_IDS} entries"
)
body["message_ids"] = message_ids

try:
response = self._request("POST", f"{self.endpoint}/{action}", json=body)
if response.status_code == 204 or not response.content:
return {}
return response.json()
except errors.ForbiddenError as e:
raise errors.FeatureNotLicensedError(
status_code=e.status_code,
message=(f"DLQ {action} requires a GlassFlow Enterprise license"),
response=e.response,
) from e
except errors.ConflictError as e:
# Reprocess replays through the running pipeline, so the API rejects
# it with 409 when the pipeline is not in the Running state. Discard
# acts on the queue directly and has no such constraint.
if action == "reprocess":
raise errors.PipelineNotRunningError(
status_code=e.status_code,
message=(
"Pipeline must be in the Running state to reprocess DLQ "
"messages"
),
response=e.response,
) from e
raise
20 changes: 17 additions & 3 deletions src/glassflow/ee/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,25 @@

from glassflow.etl.pipeline import Pipeline as _OSSPipeline

from .dlq import DLQ


class Pipeline(_OSSPipeline):
"""Enterprise Pipeline.

Extends the open-source :class:`glassflow.etl.pipeline.Pipeline`. Currently
a pass-through; Enterprise-only pipeline capabilities are added here in
follow-up work.
Extends the open-source :class:`glassflow.etl.pipeline.Pipeline`. Its ``dlq``
property exposes the Enterprise :class:`~.dlq.DLQ` (with
``list``/``reprocess``/``discard``). Construction is inherited unchanged;
only the DLQ collaborator class is swapped via ``_dlq_class``.
"""

_dlq_class = DLQ

@property
def dlq(self) -> DLQ:
"""Get the Enterprise DLQ client for this pipeline."""
return self._dlq

@dlq.setter
def dlq(self, dlq: DLQ) -> None:
self._dlq = dlq
2 changes: 2 additions & 0 deletions src/glassflow/etl/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def _raise_api_error(response: httpx.Response) -> None:
raise errors.ForbiddenError(status_code, message, response=response)
elif status_code == 404:
raise errors.NotFoundError(status_code, message, response=response)
elif status_code == 409:
raise errors.ConflictError(status_code, message, response=response)
elif status_code == 422:
raise errors.UnprocessableContentError(
status_code, message, response=response
Expand Down
17 changes: 17 additions & 0 deletions src/glassflow/etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@ class ForbiddenError(APIError):
"""Raised on 403 Forbidden errors."""


class FeatureNotLicensedError(ForbiddenError):
"""Raised when an Enterprise-only capability is invoked against a backend
that is not licensed for it (the API responds 403). Subclasses
ForbiddenError so existing 403 handling still catches it."""


class ConflictError(APIError):
"""Raised on 409 Conflict errors."""


class PipelineNotRunningError(ConflictError):
"""Raised when an operation requires a Running pipeline but the pipeline is
in another state (the API responds 409). For example, DLQ reprocessing
replays messages through the running pipeline and is rejected when the
pipeline is stopped, terminated, or failed."""


class UnprocessableContentError(APIError):
"""Raised on 422 Unprocessable Content errors."""

Expand Down
Loading