From e857340005984d404afc4d54ec0542e07f3146e5 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 11 Jun 2026 15:25:19 +0200 Subject: [PATCH 01/16] feat(ee): add Enterprise client scaffold and extensible source/format model layer Introduce `glassflow.ee` as a superset of `glassflow.etl`: - `ee.Client` / `ee.Pipeline` subclass the OSS classes; `Client` returns the EE `Pipeline` via a `_pipeline_class` seam so editions propagate without re-implementing construction. Lay the OSS foundation for open-ended, edition-extensible config models: - Add source and format registries (`models/registry.py`) with dispatch by `type` string, replacing the static discriminated union on `PipelineConfig.sources`. New source types and formats register themselves at import with no change to OSS models. - Add a `format` field to `KafkaSource` (tagged object: `json` in OSS, `avro`/`protobuf` to come in EE) with a polymorphic validation hook. `SerializeAsAny` preserves subclass-only fields on dump. - Add `_config_class` / `_config_patch_class` seams on `Pipeline` and make `validate_config` a classmethod so editions can use an extended config. - Keep a `_dlq_class` seam wired (pointing at the OSS DLQ) so the Enterprise DLQ can drop in via a follow-up PR with no OSS churn. DLQ-specific Enterprise capabilities (reprocessing/index consumption) are intentionally deferred to a follow-up PR (ETL-1187). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/glassflow/ee/__init__.py | 33 +++ src/glassflow/ee/client.py | 17 ++ src/glassflow/ee/pipeline.py | 12 ++ src/glassflow/etl/client.py | 24 ++- src/glassflow/etl/models/__init__.py | 4 + src/glassflow/etl/models/pipeline.py | 27 ++- src/glassflow/etl/models/registry.py | 75 +++++++ src/glassflow/etl/models/sources/__init__.py | 19 +- src/glassflow/etl/models/sources/formats.py | 46 ++++ src/glassflow/etl/models/sources/kafka.py | 22 +- src/glassflow/etl/pipeline.py | 27 ++- tests/test_ee.py | 45 ++++ tests/test_models/test_source_formats.py | 209 +++++++++++++++++++ 13 files changed, 540 insertions(+), 20 deletions(-) create mode 100644 src/glassflow/ee/__init__.py create mode 100644 src/glassflow/ee/client.py create mode 100644 src/glassflow/ee/pipeline.py create mode 100644 src/glassflow/etl/models/registry.py create mode 100644 src/glassflow/etl/models/sources/formats.py create mode 100644 tests/test_ee.py create mode 100644 tests/test_models/test_source_formats.py diff --git a/src/glassflow/ee/__init__.py b/src/glassflow/ee/__init__.py new file mode 100644 index 0000000..e54c555 --- /dev/null +++ b/src/glassflow/ee/__init__.py @@ -0,0 +1,33 @@ +""" +GlassFlow Enterprise SDK. + +Drop-in superset of :mod:`glassflow.etl` that adds Enterprise-only capabilities. +Use it exactly like the OSS client:: + + from glassflow.ee import Client + + client = Client(host="https://...") + pipeline = client.get_pipeline("my-pipeline") + +All open-source models are re-exported from :mod:`glassflow.etl` for +convenience, so a single import path covers both tiers. +""" + +from glassflow.etl.models import ( + JoinConfig, + PipelineConfig, + SinkConfig, + SourceConfig, +) + +from .client import Client +from .pipeline import Pipeline + +__all__ = [ + "Pipeline", + "Client", + "PipelineConfig", + "SourceConfig", + "SinkConfig", + "JoinConfig", +] diff --git a/src/glassflow/ee/client.py b/src/glassflow/ee/client.py new file mode 100644 index 0000000..3932fb6 --- /dev/null +++ b/src/glassflow/ee/client.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +from glassflow.etl.client import Client as _OSSClient + +from .pipeline import Pipeline + + +class Client(_OSSClient): + """Enterprise GlassFlow client. + + Extends the open-source :class:`glassflow.etl.client.Client`. Every pipeline + it returns is the Enterprise :class:`~.pipeline.Pipeline`, giving + Enterprise-only capabilities a home as they are added. Backend entitlement + is enforced server-side. + """ + + _pipeline_class = Pipeline diff --git a/src/glassflow/ee/pipeline.py b/src/glassflow/ee/pipeline.py new file mode 100644 index 0000000..f9feec6 --- /dev/null +++ b/src/glassflow/ee/pipeline.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from glassflow.etl.pipeline import Pipeline as _OSSPipeline + + +class Pipeline(_OSSPipeline): + """Enterprise Pipeline. + + Extends the open-source :class:`glassflow.etl.pipeline.Pipeline`. Currently + a pass-through; Enterprise-only pipeline capabilities are added here in + follow-up work. + """ diff --git a/src/glassflow/etl/client.py b/src/glassflow/etl/client.py index 333a400..0a8e85d 100644 --- a/src/glassflow/etl/client.py +++ b/src/glassflow/etl/client.py @@ -14,6 +14,12 @@ class Client(APIClient): ENDPOINT = "/api/v1/pipeline" + # Class of the Pipeline this client constructs and returns. Editions + # (e.g. the enterprise client) override this so every pipeline handed back + # is their own subclass, propagating edition-specific behaviour down the + # Client -> Pipeline -> DLQ chain without re-implementing these methods. + _pipeline_class: type[Pipeline] = Pipeline + def __init__(self, host: str | None = None) -> None: """Initialize the PipelineManager class. @@ -35,7 +41,7 @@ def get_pipeline(self, pipeline_id: str): PipelineNotFoundError: If pipeline is not found APIError: If the API request fails """ - return Pipeline(host=self.host, pipeline_id=pipeline_id).get() + return self._pipeline_class(host=self.host, pipeline_id=pipeline_id).get() def list_pipelines(self) -> List[dict]: """Returns a list of available pipelines. @@ -91,9 +97,13 @@ def create_pipeline( "pipeline_config_json_path must be provided" ) if pipeline_config_yaml_path is not None: - pipeline = Pipeline.from_yaml(pipeline_config_yaml_path, host=self.host) + pipeline = self._pipeline_class.from_yaml( + pipeline_config_yaml_path, host=self.host + ) elif pipeline_config_json_path is not None: - pipeline = Pipeline.from_json(pipeline_config_json_path, host=self.host) + pipeline = self._pipeline_class.from_json( + pipeline_config_json_path, host=self.host + ) else: if ( pipeline_config_yaml_path is not None @@ -103,7 +113,7 @@ def create_pipeline( "Either pipeline_config or pipeline_config_yaml_path or " "pipeline_config_json_path must be provided" ) - pipeline = Pipeline(config=pipeline_config, host=self.host) + pipeline = self._pipeline_class(config=pipeline_config, host=self.host) return pipeline.create() @@ -120,7 +130,9 @@ def stop_pipeline(self, pipeline_id: str, terminate: bool = False) -> None: PipelineNotFoundError: If pipeline is not found APIError: If the API request fails """ - Pipeline(host=self.host, pipeline_id=pipeline_id).stop(terminate=terminate) + self._pipeline_class(host=self.host, pipeline_id=pipeline_id).stop( + terminate=terminate + ) def delete_pipeline(self, pipeline_id: str) -> None: """Deletes the pipeline with the given ID. @@ -134,7 +146,7 @@ def delete_pipeline(self, pipeline_id: str) -> None: PipelineNotFoundError: If pipeline is not found APIError: If the API request fails """ - Pipeline(host=self.host, pipeline_id=pipeline_id).delete() + self._pipeline_class(host=self.host, pipeline_id=pipeline_id).delete() def migrate_pipeline_v2_to_v3( self, pipeline_config: dict[str, Any] diff --git a/src/glassflow/etl/models/__init__.py b/src/glassflow/etl/models/__init__.py index 9628469..54df304 100644 --- a/src/glassflow/etl/models/__init__.py +++ b/src/glassflow/etl/models/__init__.py @@ -28,6 +28,7 @@ from .sources import ( AnySource, ConsumerGroupOffset, + JsonFormat, KafkaConnectionParams, KafkaConnectionParamsPatch, KafkaField, @@ -41,6 +42,7 @@ OTLPTracesSource, SchemaRegistry, SourceConfig, + SourceFormat, ) from .transforms import ( DedupTransform, @@ -75,6 +77,7 @@ "JoinConfig", "JoinConfigPatch", "JoinOutputField", + "JsonFormat", "JoinSourceConfig", "JoinType", "KafkaConnectionParams", @@ -106,6 +109,7 @@ "SourceBaseConfig", "SourceBaseConfigPatch", "SourceConfig", + "SourceFormat", "SourceResourceEntry", "SourceType", "StatelessTransform", diff --git a/src/glassflow/etl/models/pipeline.py b/src/glassflow/etl/models/pipeline.py index a51ef9f..e453b85 100644 --- a/src/glassflow/etl/models/pipeline.py +++ b/src/glassflow/etl/models/pipeline.py @@ -1,12 +1,20 @@ import re -from typing import List, Optional - -from pydantic import BaseModel, Field, field_validator, model_validator +from typing import Any, List, Optional + +from pydantic import ( + BaseModel, + Field, + SerializeAsAny, + field_validator, + model_validator, +) from .base import CaseInsensitiveStrEnum from .metadata import MetadataConfig +from .registry import resolve_source from .resources import PipelineResourcesConfig from .sink import SinkConfig, SinkConfigPatch +from .source import SourceBaseConfig from .sources import KafkaSource, OTLPSource, SourceConfig from .transforms import ( DedupTransform, @@ -38,13 +46,24 @@ class PipelineConfig(BaseModel): version: PipelineVersion = Field(default=PipelineVersion.V3) pipeline_id: str name: Optional[str] = Field(default=None) - sources: List[SourceConfig] + # Each source is dispatched to its concrete class via the source registry + # (see resolve_sources), so editions can add source types without + # redefining a union here. SerializeAsAny preserves subclass-only fields. + sources: List[SerializeAsAny[SourceBaseConfig]] transforms: Optional[List[TransformEntry]] = Field(default=None) join: Optional[JoinConfig] = Field(default=None) sink: SinkConfig metadata: Optional[MetadataConfig] = Field(default=MetadataConfig()) resources: Optional[PipelineResourcesConfig] = Field(default=None) + @field_validator("sources", mode="before") + @classmethod + def resolve_sources(cls, value: Any) -> Any: + """Dispatch each raw source dict to its concrete registered class.""" + if isinstance(value, list): + return [resolve_source(item) for item in value] + return value + @field_validator("version") @classmethod def validate_version(cls, v: PipelineVersion) -> PipelineVersion: diff --git a/src/glassflow/etl/models/registry.py b/src/glassflow/etl/models/registry.py new file mode 100644 index 0000000..14b436c --- /dev/null +++ b/src/glassflow/etl/models/registry.py @@ -0,0 +1,75 @@ +"""Extensible registries for source types and source formats. + +Source types (``kafka``, ``otlp.logs``, ...) and source formats (``json``, +and Enterprise ``avro``/``protobuf``) are open-ended: the Enterprise SDK adds +new ones without modifying this package. Instead of a static Pydantic +discriminated union (whose members are frozen at class-definition time), models +annotate their fields with the base type and dispatch each value to the +concrete class by its ``type`` string at validation time, looked up here. + +Editions register their classes at import:: + + from glassflow.etl.models.registry import register_source, register_format + register_source(KinesisSource) + register_format(AvroFormat) +""" + +from __future__ import annotations + +from typing import Any, Dict, Type, TypeVar + +from pydantic import BaseModel + +_T = TypeVar("_T", bound=BaseModel) + +_SOURCE_CLASSES: Dict[str, Type[BaseModel]] = {} +_FORMAT_CLASSES: Dict[str, Type[BaseModel]] = {} + + +def _type_key(cls: Type[BaseModel]) -> str: + """Derive the discriminator key from a model's ``type`` field default.""" + field = cls.model_fields.get("type") + if field is None or field.default is None: + raise ValueError( + f"{cls.__name__} must define a 'type' field with a literal default " + "to be registered" + ) + return str(field.default) + + +def register_source(cls: Type[_T]) -> Type[_T]: + """Register a source class, keyed by its ``type`` default. Usable as a + decorator. Idempotent for re-imports.""" + _SOURCE_CLASSES[_type_key(cls)] = cls + return cls + + +def register_format(cls: Type[_T]) -> Type[_T]: + """Register a source format class, keyed by its ``type`` default. Usable as + a decorator.""" + _FORMAT_CLASSES[_type_key(cls)] = cls + return cls + + +def _resolve(value: Any, registry: Dict[str, Type[BaseModel]], kind: str) -> Any: + """Coerce a raw dict to the registered concrete model by its ``type``. + + Already-constructed model instances and non-dict values pass through + untouched so Pydantic can validate (or reject) them normally. + """ + if isinstance(value, BaseModel) or not isinstance(value, dict): + return value + type_value = value.get("type") + cls = registry.get(str(type_value)) if type_value is not None else None + if cls is None: + known = ", ".join(sorted(registry)) or "(none registered)" + raise ValueError(f"Unknown {kind} type {type_value!r}. Known types: {known}") + return cls.model_validate(value) + + +def resolve_source(value: Any) -> Any: + return _resolve(value, _SOURCE_CLASSES, "source") + + +def resolve_format(value: Any) -> Any: + return _resolve(value, _FORMAT_CLASSES, "format") diff --git a/src/glassflow/etl/models/sources/__init__.py b/src/glassflow/etl/models/sources/__init__.py index 6b8ebcd..6a4b2f3 100644 --- a/src/glassflow/etl/models/sources/__init__.py +++ b/src/glassflow/etl/models/sources/__init__.py @@ -14,7 +14,9 @@ from pydantic import Field # noqa: F401 +from ..registry import register_source from ..source import SourceBaseConfig, SourceBaseConfigPatch, SourceType +from .formats import JsonFormat, SourceFormat from .kafka import ( ConsumerGroupOffset, KafkaConnectionParams, @@ -34,7 +36,10 @@ OTLPTracesSource, ) -# Discriminated union -- Pydantic resolves the concrete class via the `type` field. +# Discriminated union -- kept as a convenience type alias for the OSS source +# set. The PipelineConfig.sources field no longer uses it directly; instead it +# dispatches via the source registry so editions can add new source types +# without redefining this union. SourceConfig = Annotated[ Union[KafkaSource, OTLPLogsSource, OTLPMetricsSource, OTLPTracesSource], Field(discriminator="type"), @@ -46,6 +51,15 @@ AnySource = SourceConfig AnySourcePatch = SourceConfigPatch +# Register the OSS source types so the registry-backed dispatch can resolve them. +for _source_cls in ( + KafkaSource, + OTLPLogsSource, + OTLPMetricsSource, + OTLPTracesSource, +): + register_source(_source_cls) + __all__ = [ # Base "SourceType", @@ -67,6 +81,9 @@ "OTLPSource", "OTLPSourcePatch", "OTLPTracesSource", + # Formats + "SourceFormat", + "JsonFormat", # Union "AnySource", "SourceConfig", diff --git a/src/glassflow/etl/models/sources/formats.py b/src/glassflow/etl/models/sources/formats.py new file mode 100644 index 0000000..aed8c28 --- /dev/null +++ b/src/glassflow/etl/models/sources/formats.py @@ -0,0 +1,46 @@ +"""Source format models. + +A source's ``format`` describes how its payload is encoded and how the schema +is supplied. It is a tagged object discriminated by ``type`` (``json`` in OSS; +``avro``/``protobuf`` in Enterprise), with format-specific configuration nested +under a key named after the type, e.g.:: + + {"type": "protobuf", "protobuf": {"schema_source": "inline", "proto_text": "..."}} + +New formats are added by subclassing :class:`SourceFormat` and registering them +with :func:`glassflow.etl.models.registry.register_format`; no change to this +package is required. +""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel + +from ..registry import register_format + + +class SourceFormat(BaseModel): + """Base class for all source formats.""" + + type: str + + def validate_against_registry(self, has_schema_registry: bool) -> None: + """Hook for formats to enforce schema-source rules that depend on the + parent source's schema registry configuration. + + Called by the source's after-validator with whether the source has a + schema registry configured. The base implementation is a no-op; + formats that can read their schema from a registry (e.g. Enterprise + avro/protobuf) override this to require that the registry is present + when ``schema_source`` is ``"registry"``. + """ + return None + + +@register_format +class JsonFormat(SourceFormat): + """JSON-encoded payloads. This is the default and carries no extra config.""" + + type: Literal["json"] = "json" diff --git a/src/glassflow/etl/models/sources/kafka.py b/src/glassflow/etl/models/sources/kafka.py index 3f6158c..fec07aa 100644 --- a/src/glassflow/etl/models/sources/kafka.py +++ b/src/glassflow/etl/models/sources/kafka.py @@ -2,11 +2,13 @@ from typing import Any, List, Literal, Optional -from pydantic import BaseModel, Field, model_validator +from pydantic import BaseModel, Field, SerializeAsAny, field_validator, model_validator from ..base import CaseInsensitiveStrEnum from ..data_types import KafkaDataType +from ..registry import resolve_format from ..source import SourceBaseConfig, SourceBaseConfigPatch, SourceType +from .formats import SourceFormat class KafkaProtocol(CaseInsensitiveStrEnum): @@ -86,6 +88,10 @@ class KafkaSource(SourceBaseConfig): schema_registry: Optional[SchemaRegistry] = Field(default=None) schema_version: Optional[str] = Field(default=None) schema_fields: Optional[List[KafkaField]] = Field(default=None) + # Payload format. ``None`` means JSON (default) and is omitted from the + # serialized config. Concrete subclass is resolved from the format registry + # by its ``type``; SerializeAsAny preserves subclass-only fields on dump. + format: Optional[SerializeAsAny[SourceFormat]] = Field(default=None) @model_validator(mode="before") @classmethod @@ -95,6 +101,12 @@ def validate_empty_schema_registry(cls, data: Any) -> Any: data.pop("schema_registry", None) return data + @field_validator("format", mode="before") + @classmethod + def resolve_format_field(cls, value: Any) -> Any: + """Dispatch a raw format dict to the concrete registered format class.""" + return resolve_format(value) + @model_validator(mode="after") def validate_schema_registry_requires_version(self) -> "KafkaSource": """Validate that schema_version is set when schema_registry is provided.""" @@ -104,6 +116,14 @@ def validate_schema_registry_requires_version(self) -> "KafkaSource": ) return self + @model_validator(mode="after") + def validate_format(self) -> "KafkaSource": + """Let the format enforce schema-source rules against this source's + schema registry configuration.""" + if self.format is not None: + self.format.validate_against_registry(self.schema_registry is not None) + return self + def update(self, patch: "KafkaSourcePatch") -> "KafkaSource": """Apply a patch to this source config.""" update_dict = self.model_copy(deep=True) diff --git a/src/glassflow/etl/pipeline.py b/src/glassflow/etl/pipeline.py index 1e34cda..9ec4590 100644 --- a/src/glassflow/etl/pipeline.py +++ b/src/glassflow/etl/pipeline.py @@ -18,6 +18,17 @@ class Pipeline(APIClient): ENDPOINT = "/api/v1/pipeline" + # Class of the DLQ client this pipeline constructs. Editions override this + # to have ``self.dlq`` expose their own DLQ subclass without re-implementing + # construction. The Enterprise DLQ itself lands in a follow-up PR. + _dlq_class: type[DLQ] = DLQ + + # Config models this pipeline validates/serializes. Editions override these + # to use their extended PipelineConfig (e.g. with EE-only source types and + # formats) without re-implementing __init__/get/update. + _config_class: type[models.PipelineConfig] = models.PipelineConfig + _config_patch_class: type[models.PipelineConfigPatch] = models.PipelineConfigPatch + def __init__( self, host: str | None = None, @@ -43,14 +54,14 @@ def __init__( if config is not None: if isinstance(config, dict): - self.config = models.PipelineConfig.model_validate(config) + self.config = self._config_class.model_validate(config) else: self.config = config self.pipeline_id = self.config.pipeline_id else: self.config = None - self._dlq = DLQ(pipeline_id=self.pipeline_id, host=host) + self._dlq = self._dlq_class(pipeline_id=self.pipeline_id, host=host) self.status: models.PipelineStatus | None = None def get( @@ -85,9 +96,9 @@ def get( event_name="PipelineGet", **kwargs, ) - self.config = models.PipelineConfig.model_validate(response.json()) + self.config = self._config_class.model_validate(response.json()) self.health() - self._dlq = DLQ(pipeline_id=self.pipeline_id, host=self.host) + self._dlq = self._dlq_class(pipeline_id=self.pipeline_id, host=self.host) return self def create(self) -> Pipeline: @@ -169,7 +180,7 @@ def update( """ self.get() # Get latest config if isinstance(config_patch, dict): - config_patch = models.PipelineConfigPatch.model_validate(config_patch) + config_patch = self._config_patch_class.model_validate(config_patch) else: config_patch = config_patch updated_config = self.config.update(config_patch) @@ -333,8 +344,8 @@ def from_json(cls, json_path: str, host: str | None = None) -> Pipeline: config = json.load(f) return cls(config=config, host=host) - @staticmethod - def validate_config(config: dict[str, Any]) -> bool: + @classmethod + def validate_config(cls, config: dict[str, Any]) -> bool: """ Validate a pipeline configuration. @@ -348,7 +359,7 @@ def validate_config(config: dict[str, Any]) -> bool: ValueError: If the configuration is invalid ValidationError: If the configuration fails Pydantic validation """ - models.PipelineConfig.model_validate(config) + cls._config_class.model_validate(config) return True @property diff --git a/tests/test_ee.py b/tests/test_ee.py new file mode 100644 index 0000000..53e6c5d --- /dev/null +++ b/tests/test_ee.py @@ -0,0 +1,45 @@ +"""Tests for the Enterprise (ee) client and pipeline scaffold. + +DLQ-specific Enterprise capabilities are covered in a follow-up PR. +""" + +import pytest + +from glassflow import ee +from glassflow.etl.client import Client as OSSClient +from glassflow.etl.pipeline import Pipeline as OSSPipeline + + +@pytest.fixture +def ee_pipeline(valid_config): + """Fixture for an Enterprise Pipeline with a valid config.""" + config = ee.PipelineConfig(**valid_config) + return ee.Pipeline(host="http://localhost:8080", config=config) + + +class TestEEInheritance: + """The ee classes extend, not replace, the OSS ones.""" + + def test_ee_client_subclasses_oss(self): + assert issubclass(ee.Client, OSSClient) + + def test_ee_pipeline_subclasses_oss(self): + assert issubclass(ee.Pipeline, OSSPipeline) + + +class TestEEWiring: + """Edition propagates from Client to the Pipeline it returns.""" + + def test_client_constructs_ee_pipeline(self): + assert ee.Client._pipeline_class is ee.Pipeline + + def test_get_pipeline_returns_ee_pipeline( + self, mock_success, get_pipeline_response, get_health_payload + ): + client = ee.Client(host="http://localhost:8080") + with mock_success( + [get_pipeline_response, get_health_payload("test-pipeline-id")] + ): + pipeline = client.get_pipeline("test-pipeline-id") + + assert isinstance(pipeline, ee.Pipeline) diff --git a/tests/test_models/test_source_formats.py b/tests/test_models/test_source_formats.py new file mode 100644 index 0000000..add330f --- /dev/null +++ b/tests/test_models/test_source_formats.py @@ -0,0 +1,209 @@ +"""Tests for source formats and the source/format registries. + +These prove the open-ended extension mechanism: an out-of-tree format or source +type (standing in for what the Enterprise SDK ships) plugs in via the registry +with no change to the OSS models, and SerializeAsAny preserves its fields on +dump. +""" + +from typing import Literal, Optional + +import pytest +from pydantic import BaseModel, model_validator + +from glassflow.etl import models +from glassflow.etl.models import registry +from glassflow.etl.models.base import CaseInsensitiveStrEnum +from glassflow.etl.models.sources.formats import SourceFormat + +# --- Out-of-tree EE-like definitions (not part of OSS) ---------------------- + + +class SchemaSource(CaseInsensitiveStrEnum): + INLINE = "inline" + REGISTRY = "registry" + + +class ProtobufConfig(BaseModel): + schema_source: SchemaSource + proto_text: Optional[str] = None + + @model_validator(mode="after") + def _require_inline_text(self): + if self.schema_source == SchemaSource.INLINE and not self.proto_text: + raise ValueError("proto_text is required when schema_source is 'inline'") + return self + + +class ProtobufFormat(SourceFormat): + type: Literal["protobuf"] = "protobuf" + protobuf: ProtobufConfig + + def validate_against_registry(self, has_schema_registry: bool) -> None: + if self.protobuf.schema_source == SchemaSource.REGISTRY and not ( + has_schema_registry + ): + raise ValueError( + "protobuf with schema_source 'registry' requires a schema registry " + "on the source" + ) + + +class KinesisSource(models.SourceBaseConfig): + type: Literal["kinesis"] = "kinesis" + stream_name: str + region: str + + +@pytest.fixture +def register_ee_types(): + """Register the out-of-tree types and clean up afterward.""" + registry.register_format(ProtobufFormat) + registry.register_source(KinesisSource) + yield + registry._FORMAT_CLASSES.pop("protobuf", None) + registry._SOURCE_CLASSES.pop("kinesis", None) + + +# The exact payload from the ticket discussion. +PROTOBUF_SOURCE = { + "type": "kafka", + "source_id": "events", + "connection_params": { + "brokers": ["kafka-controller-0.staging-cluster.glassflow.xyz:9094"], + "mechanism": "PLAIN", + "protocol": "SASL_PLAINTEXT", + "username": "glassflow", + "password": "secret", + }, + "topic": "test_proto_events_dedup", + "format": { + "type": "protobuf", + "protobuf": { + "schema_source": "inline", + "proto_text": 'syntax = "proto3";\nmessage Event {\n string id = 1;\n}', + }, + }, + "consumer_group_initial_offset": "earliest", +} + + +class TestJsonFormat: + """OSS default format behaviour.""" + + def test_format_defaults_to_none(self): + src = models.KafkaSource.model_validate( + { + "type": "kafka", + "source_id": "s1", + "connection_params": {"brokers": ["b:9092"], "protocol": "PLAINTEXT"}, + "topic": "t", + } + ) + assert src.format is None + # Omitted from the serialized config so OSS payloads are unchanged. + assert "format" not in src.model_dump(by_alias=True, exclude_none=True) + + def test_explicit_json_format_roundtrips(self): + src = models.KafkaSource.model_validate( + { + "type": "kafka", + "source_id": "s1", + "connection_params": {"brokers": ["b:9092"], "protocol": "PLAINTEXT"}, + "topic": "t", + "format": {"type": "json"}, + } + ) + assert isinstance(src.format, models.JsonFormat) + dumped = src.model_dump(by_alias=True, exclude_none=True) + assert dumped["format"] == {"type": "json"} + + def test_unknown_format_raises(self): + with pytest.raises(ValueError, match="Unknown format type 'avro'"): + models.KafkaSource.model_validate( + { + "type": "kafka", + "source_id": "s1", + "connection_params": { + "brokers": ["b:9092"], + "protocol": "PLAINTEXT", + }, + "topic": "t", + "format": {"type": "avro"}, + } + ) + + +class TestRegisteredFormat: + """An out-of-tree format plugs in via the registry.""" + + def test_protobuf_payload_roundtrips(self, register_ee_types): + src = models.KafkaSource.model_validate(PROTOBUF_SOURCE) + + assert isinstance(src.format, ProtobufFormat) + assert src.format.protobuf.schema_source == SchemaSource.INLINE + + # SerializeAsAny preserves the subclass-only nested config on dump, + # round-tripping to the exact wire shape. + dumped = src.model_dump(by_alias=True, exclude_none=True) + assert dumped["format"] == PROTOBUF_SOURCE["format"] + + def test_protobuf_inline_requires_text(self, register_ee_types): + bad = {**PROTOBUF_SOURCE} + bad["format"] = {"type": "protobuf", "protobuf": {"schema_source": "inline"}} + with pytest.raises(ValueError, match="proto_text is required"): + models.KafkaSource.model_validate(bad) + + def test_protobuf_registry_source_requires_schema_registry(self, register_ee_types): + # schema_source 'registry' but no schema_registry on the source -> hook fires. + bad = {**PROTOBUF_SOURCE} + bad["format"] = {"type": "protobuf", "protobuf": {"schema_source": "registry"}} + with pytest.raises(ValueError, match="requires a schema registry"): + models.KafkaSource.model_validate(bad) + + +class TestRegisteredSourceType: + """An out-of-tree source type plugs in via the registry.""" + + def _config(self, source: dict) -> dict: + return { + "pipeline_id": "p1", + "sources": [source], + "sink": { + "type": "clickhouse", + "connection_params": { + "host": "h", + "port": "9000", + "database": "db", + "username": "u", + "password": "p", + "secure": True, + }, + "table": "t", + "max_batch_size": 1, + "mapping": [ + {"name": "id", "column_name": "id", "column_type": "String"} + ], + }, + } + + def test_kinesis_source_dispatches_and_roundtrips(self, register_ee_types): + kinesis = { + "type": "kinesis", + "source_id": "k1", + "stream_name": "events", + "region": "eu-central-1", + } + cfg = models.PipelineConfig.model_validate(self._config(kinesis)) + + assert isinstance(cfg.sources[0], KinesisSource) + # SerializeAsAny keeps kinesis-only fields through the base-typed field. + dumped = cfg.model_dump(by_alias=True, exclude_none=True) + assert dumped["sources"][0]["stream_name"] == "events" + assert dumped["sources"][0]["region"] == "eu-central-1" + + def test_unknown_source_type_raises(self): + with pytest.raises(ValueError, match="Unknown source type 'pubsub'"): + models.PipelineConfig.model_validate( + self._config({"type": "pubsub", "source_id": "x"}) + ) From c56751120b81da66ca34bcc4dc8d28b17c9b873c Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 11 Jun 2026 13:27:04 +0000 Subject: [PATCH 02/16] docs: update coverage badge --- coverage.xml | 672 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 391 insertions(+), 281 deletions(-) diff --git a/coverage.xml b/coverage.xml index 8dfb618..bb216f4 100644 --- a/coverage.xml +++ b/coverage.xml @@ -1,15 +1,46 @@ - - + + /home/runner/work/glassflow-python-sdk/glassflow-python-sdk/src /home/runner/work/glassflow-python-sdk/glassflow-python-sdk/src/glassflow - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -19,7 +50,7 @@ - + @@ -88,7 +119,7 @@ - + @@ -98,48 +129,49 @@ - + - - - - - - - + + + + + - - - - - - - - + + + + + + + + - - - - + + + + - - - + + + + + - - - - - - + + - - + + + + + + + - + @@ -185,7 +217,7 @@ - + @@ -218,7 +250,7 @@ - + @@ -231,148 +263,151 @@ - - - - - - - - - + + + + - + - + + - + - - - - + + + + + + + + - - + + + + - - - + + - - - + + + - - - - - - - - - - - - - - + + + + + + + + + + + + + + + - - - - - - - - - - - - - + + + + + - - - - - + + + + + + + + + + + + - - - - + + - - - - - - + + + + + + - + + + - - - + + + - - - - - - - - + + + + + + - - - - - - - - + + + + + + + - - + + + - + + + - - + + - - - - - - - + + - + - - + + + + + - - + + + + + + - + @@ -405,9 +440,9 @@ - + - + @@ -418,11 +453,11 @@ - - + + - + @@ -437,7 +472,7 @@ - + @@ -476,7 +511,7 @@ - + @@ -562,7 +597,7 @@ - + @@ -571,142 +606,185 @@ - + - - - - - - + + + + + + + - - - - - - - + + - - + + - - + - - - - + + + + - - + + + + + + + - - - - + - - - - - + - + + - + + + - - - - + + + + + + - - - - - + + + + + + - - - - + + + + - - - - - - - - - - + + + + + + - - - + + + + + + - - - + + + + - - - + + - + - + - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + @@ -784,7 +862,7 @@ - + @@ -866,7 +944,7 @@ - + @@ -887,24 +965,44 @@ - + - + + + + + + + + + + + + + + + + + + + + + + - + + - - - + @@ -912,29 +1010,29 @@ - - + + - - + + - - - + + + - - + + - - - - - + + + + + @@ -945,65 +1043,77 @@ + - - + + - - - - + + + + - - - - + - + + + - - + + + - - + + - - + + + + - - - - - + - - - + + + + + + + + + + + + + + + + - + @@ -1021,9 +1131,9 @@ - + - + @@ -1053,7 +1163,7 @@ - + @@ -1076,7 +1186,7 @@ - + @@ -1084,7 +1194,7 @@ - + @@ -1140,7 +1250,7 @@ - + From eb395b44a6eb1a73f8acae135b7b4c83d134e9bd Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 11 Jun 2026 18:27:24 +0200 Subject: [PATCH 03/16] feat(ee): add DLQ list / reprocess / discard to the Enterprise client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the SDK side of DLQ message processing (ETL-1187) on the Enterprise DLQ: - `ee.DLQ.list(batch_size, cursor)` — non-destructive, paginated read. Messages now carry `message_id` (NATS seq), `source`, and `received_at`. - `reprocess(message_ids)` / `reprocess_all()` — move messages back into the pipeline input (POST /dlq/reprocess, mode=selected|all). - `discard(message_ids)` / `discard_all()` — permanently remove (POST /dlq/discard, mode=selected|all). Deprecate the inherited operations whose endpoints are changing under EE: - `consume()` warns and delegates to `list()` (the /dlq/consume endpoint is being removed in favour of non-destructive /dlq/list), so callers keep working through the transition. - `purge()` warns and points to `discard_all()`; the legacy endpoint still works. message_ids are validated client-side (non-empty, <=1000) to mirror the backend and fail fast. A 403 from an unlicensed backend maps to FeatureNotLicensedError (re-added), which subclasses ForbiddenError so existing 403 handling still catches it. `ee.Pipeline` wires `_dlq_class` to the EE DLQ so `client.get_pipeline(id).dlq` exposes these methods. Contract per the DLQ Message Processing design doc / glassflow-etl-ee API. The backend reprocess/discard handlers and the consume->list rename are still in flight; the SDK is written against the agreed contract and tests mock the HTTP layer. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/glassflow/ee/__init__.py | 3 + src/glassflow/ee/dlq.py | 170 ++++++++++++++++++++++++++++++++ src/glassflow/ee/pipeline.py | 20 +++- src/glassflow/etl/errors.py | 6 ++ tests/test_ee_dlq.py | 182 +++++++++++++++++++++++++++++++++++ 5 files changed, 378 insertions(+), 3 deletions(-) create mode 100644 src/glassflow/ee/dlq.py create mode 100644 tests/test_ee_dlq.py diff --git a/src/glassflow/ee/__init__.py b/src/glassflow/ee/__init__.py index e54c555..0aff448 100644 --- a/src/glassflow/ee/__init__.py +++ b/src/glassflow/ee/__init__.py @@ -8,6 +8,7 @@ client = Client(host="https://...") pipeline = client.get_pipeline("my-pipeline") + pipeline.dlq.reprocess_all() # Enterprise DLQ management All open-source models are re-exported from :mod:`glassflow.etl` for convenience, so a single import path covers both tiers. @@ -21,11 +22,13 @@ ) from .client import Client +from .dlq import DLQ from .pipeline import Pipeline __all__ = [ "Pipeline", "Client", + "DLQ", "PipelineConfig", "SourceConfig", "SinkConfig", diff --git a/src/glassflow/ee/dlq.py b/src/glassflow/ee/dlq.py new file mode 100644 index 0000000..553f5da --- /dev/null +++ b/src/glassflow/ee/dlq.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +import warnings +from typing import Any, Dict, List, Optional + +from glassflow.etl import errors +from glassflow.etl.dlq import DLQ as _OSSDLQ + +# Mirrors the backend cap on message_ids per mode=selected request +# (dlqSelectedMaxMessageIDs in glassflow-etl-ee). Validated client-side for a +# fast, offline error instead of a round-trip 400. +MAX_SELECTED_MESSAGE_IDS = 1000 + + +class DLQ(_OSSDLQ): + """Enterprise Dead Letter Queue client. + + Extends the open-source :class:`glassflow.etl.dlq.DLQ` with message + management: a non-destructive paginated :meth:`list`, and + :meth:`reprocess`/:meth:`discard` (plus their ``*_all`` variants) that move + messages back into the pipeline or permanently remove them. + + Backend entitlement is enforced server-side; calling these against a backend + that is not licensed for them raises :class:`FeatureNotLicensedError`. + """ + + def list( + self, batch_size: int = 100, cursor: Optional[str] = None + ) -> List[Dict[str, Any]]: + """Read messages from the DLQ without removing them. + + The non-destructive successor to :meth:`consume`. Each message carries a + stable ``message_id`` (NATS sequence number, as a string) plus + ``source``, ``component``, ``error``, ``original_message`` and + ``received_at``. The ``message_id`` values are what :meth:`reprocess` + and :meth:`discard` act on in ``mode=selected``. + + Args: + batch_size: Number of messages to read (between 1 and 100). + cursor: Opaque pagination cursor from a previous page; omit for the + first page. + + Returns: + List of DLQ message dicts. + + Raises: + ValueError: If ``batch_size`` is invalid. + APIError: If the API request fails. + """ + if ( + not isinstance(batch_size, int) + or batch_size < 1 + or batch_size > self._max_batch_size + ): + raise ValueError( + f"batch_size must be an integer between 1 and {self._max_batch_size}" + ) + + params: Dict[str, Any] = {"batch_size": batch_size} + if cursor is not None: + params["cursor"] = cursor + + response = self._request("GET", f"{self.endpoint}/list", params=params) + if response.status_code == 204 or not response.content: + return [] + return response.json() + + def reprocess(self, message_ids: List[str]) -> Dict[str, Any]: + """Move specific messages from the DLQ back into the pipeline input. + + Args: + message_ids: ``message_id`` values (from :meth:`list`) to reprocess; + must be non-empty and at most ``MAX_SELECTED_MESSAGE_IDS``. + + Returns: + Dict with ``request_id`` and ``status`` ("accepted"). The republish + happens asynchronously. + + Raises: + ValueError: If ``message_ids`` is empty or too large. + FeatureNotLicensedError: If the backend is not licensed for this. + APIError: If the API request fails (e.g. 409 if the pipeline is not + Running). + """ + return self._action("reprocess", "selected", message_ids) + + def reprocess_all(self) -> Dict[str, Any]: + """Reprocess every message currently in the DLQ (up to the head at + request time). See :meth:`reprocess`.""" + return self._action("reprocess", "all", None) + + def discard(self, message_ids: List[str]) -> Dict[str, Any]: + """Permanently remove specific messages from the DLQ without + reprocessing them. + + Args: + message_ids: ``message_id`` values (from :meth:`list`) to discard; + must be non-empty and at most ``MAX_SELECTED_MESSAGE_IDS``. + + Returns: + Dict with ``request_id`` and ``discarded_count``. + + Raises: + ValueError: If ``message_ids`` is empty or too large. + FeatureNotLicensedError: If the backend is not licensed for this. + APIError: If the API request fails. + """ + return self._action("discard", "selected", message_ids) + + def discard_all(self) -> Dict[str, Any]: + """Permanently remove every message currently in the DLQ. See + :meth:`discard`.""" + return self._action("discard", "all", None) + + # Deprecated, inherited operations ------------------------------------- + + def consume(self, batch_size: int = 100) -> List[Dict[str, Any]]: + """Deprecated: use :meth:`list`. + + The ``/dlq/consume`` endpoint is being removed in favour of the + non-destructive ``/dlq/list``. This override delegates to :meth:`list` + so existing callers keep working through the transition. + """ + warnings.warn( + "DLQ.consume() is deprecated and the /dlq/consume endpoint is being " + "removed; use DLQ.list() (non-destructive).", + DeprecationWarning, + stacklevel=2, + ) + return self.list(batch_size=batch_size) + + def purge(self) -> None: + """Deprecated: use :meth:`discard_all`. + + ``/dlq/purge`` remains for backward compatibility but is superseded by + :meth:`discard_all`. + """ + warnings.warn( + "DLQ.purge() is deprecated; use DLQ.discard_all().", + DeprecationWarning, + stacklevel=2, + ) + super().purge() + + def _action( + self, action: str, mode: str, message_ids: Optional[List[str]] + ) -> Dict[str, Any]: + body: Dict[str, Any] = {"mode": mode} + if mode == "selected": + if not message_ids: + raise ValueError( + "message_ids must be non-empty when selecting messages" + ) + if len(message_ids) > MAX_SELECTED_MESSAGE_IDS: + raise ValueError( + f"message_ids cannot exceed {MAX_SELECTED_MESSAGE_IDS} entries" + ) + body["message_ids"] = message_ids + + try: + response = self._request("POST", f"{self.endpoint}/{action}", json=body) + if response.status_code == 204 or not response.content: + return {} + return response.json() + except errors.ForbiddenError as e: + raise errors.FeatureNotLicensedError( + status_code=e.status_code, + message=(f"DLQ {action} requires a GlassFlow Enterprise license"), + response=e.response, + ) from e diff --git a/src/glassflow/ee/pipeline.py b/src/glassflow/ee/pipeline.py index f9feec6..7dd57fd 100644 --- a/src/glassflow/ee/pipeline.py +++ b/src/glassflow/ee/pipeline.py @@ -2,11 +2,25 @@ from glassflow.etl.pipeline import Pipeline as _OSSPipeline +from .dlq import DLQ + class Pipeline(_OSSPipeline): """Enterprise Pipeline. - Extends the open-source :class:`glassflow.etl.pipeline.Pipeline`. Currently - a pass-through; Enterprise-only pipeline capabilities are added here in - follow-up work. + Extends the open-source :class:`glassflow.etl.pipeline.Pipeline`. Its ``dlq`` + property exposes the Enterprise :class:`~.dlq.DLQ` (with + ``list``/``reprocess``/``discard``). Construction is inherited unchanged; + only the DLQ collaborator class is swapped via ``_dlq_class``. """ + + _dlq_class = DLQ + + @property + def dlq(self) -> DLQ: + """Get the Enterprise DLQ client for this pipeline.""" + return self._dlq + + @dlq.setter + def dlq(self, dlq: DLQ) -> None: + self._dlq = dlq diff --git a/src/glassflow/etl/errors.py b/src/glassflow/etl/errors.py index b3651f7..5292103 100644 --- a/src/glassflow/etl/errors.py +++ b/src/glassflow/etl/errors.py @@ -34,6 +34,12 @@ class ForbiddenError(APIError): """Raised on 403 Forbidden errors.""" +class FeatureNotLicensedError(ForbiddenError): + """Raised when an Enterprise-only capability is invoked against a backend + that is not licensed for it (the API responds 403). Subclasses + ForbiddenError so existing 403 handling still catches it.""" + + class UnprocessableContentError(APIError): """Raised on 422 Unprocessable Content errors.""" diff --git a/tests/test_ee_dlq.py b/tests/test_ee_dlq.py new file mode 100644 index 0000000..591894d --- /dev/null +++ b/tests/test_ee_dlq.py @@ -0,0 +1,182 @@ +"""Tests for the Enterprise DLQ: list / reprocess / discard.""" + +from unittest.mock import patch + +import pytest + +from glassflow import ee +from glassflow.etl import errors +from glassflow.etl.dlq import DLQ as OSSDLQ +from tests.data import mock_responses + + +@pytest.fixture +def ee_dlq(): + return ee.DLQ(host="http://localhost:8080", pipeline_id="test-pipeline") + + +@pytest.fixture +def ee_pipeline(valid_config): + config = ee.PipelineConfig(**valid_config) + return ee.Pipeline(host="http://localhost:8080", config=config) + + +class TestEEDLQWiring: + def test_ee_dlq_subclasses_oss(self): + assert issubclass(ee.DLQ, OSSDLQ) + + def test_pipeline_exposes_ee_dlq(self, ee_pipeline): + assert isinstance(ee_pipeline.dlq, ee.DLQ) + + def test_dlq_setter_still_works(self, ee_pipeline, ee_dlq): + ee_pipeline.dlq = ee_dlq + assert ee_pipeline.dlq is ee_dlq + + def test_get_pipeline_dlq_is_ee( + self, mock_success, get_pipeline_response, get_health_payload + ): + client = ee.Client(host="http://localhost:8080") + with mock_success( + [get_pipeline_response, get_health_payload("test-pipeline-id")] + ): + pipeline = client.get_pipeline("test-pipeline-id") + assert isinstance(pipeline.dlq, ee.DLQ) + + +class TestList: + def test_list_success(self, ee_dlq, mock_success): + payload = [ + { + "message_id": "seq_101", + "source": "source-0", + "component": "sink", + "error": "connection refused", + "original_message": "{}", + "received_at": "2026-05-29T14:00:00Z", + } + ] + with mock_success(json_payloads=[payload]) as mock_get: + result = ee_dlq.list(batch_size=50) + + mock_get.assert_called_once_with( + "GET", f"{ee_dlq.endpoint}/list", params={"batch_size": 50} + ) + assert result == payload + + def test_list_with_cursor(self, ee_dlq, mock_success): + with mock_success(json_payloads=[[]]) as mock_get: + ee_dlq.list(batch_size=10, cursor="seq_200") + mock_get.assert_called_once_with( + "GET", + f"{ee_dlq.endpoint}/list", + params={"batch_size": 10, "cursor": "seq_200"}, + ) + + def test_list_empty_on_204(self, ee_dlq): + mock_response = mock_responses.create_mock_response_factory()( + status_code=204, json_data=None + ) + with patch("httpx.Client.request", return_value=mock_response): + assert ee_dlq.list() == [] + + @pytest.mark.parametrize("bad", [0, 101, -1, "10"]) + def test_list_invalid_batch_size(self, ee_dlq, bad): + with pytest.raises(ValueError, match="batch_size must be an integer"): + ee_dlq.list(batch_size=bad) + + +class TestReprocess: + def test_reprocess_selected(self, ee_dlq, mock_success): + with mock_success( + json_payloads=[{"request_id": "rep_1", "status": "accepted"}] + ) as mock_post: + result = ee_dlq.reprocess(["seq_101", "seq_102"]) + + mock_post.assert_called_once_with( + "POST", + f"{ee_dlq.endpoint}/reprocess", + json={"mode": "selected", "message_ids": ["seq_101", "seq_102"]}, + ) + assert result == {"request_id": "rep_1", "status": "accepted"} + + def test_reprocess_all(self, ee_dlq, mock_success): + with mock_success( + json_payloads=[{"request_id": "rep_2", "status": "accepted"}] + ) as mock_post: + ee_dlq.reprocess_all() + mock_post.assert_called_once_with( + "POST", f"{ee_dlq.endpoint}/reprocess", json={"mode": "all"} + ) + + def test_reprocess_empty_ids_raises(self, ee_dlq): + with pytest.raises(ValueError, match="must be non-empty"): + ee_dlq.reprocess([]) + + def test_reprocess_too_many_ids_raises(self, ee_dlq): + with pytest.raises(ValueError, match="cannot exceed 1000"): + ee_dlq.reprocess([str(i) for i in range(1001)]) + + +class TestDiscard: + def test_discard_selected(self, ee_dlq, mock_success): + with mock_success( + json_payloads=[{"request_id": "dis_1", "discarded_count": 2}] + ) as mock_post: + result = ee_dlq.discard(["seq_1", "seq_2"]) + + mock_post.assert_called_once_with( + "POST", + f"{ee_dlq.endpoint}/discard", + json={"mode": "selected", "message_ids": ["seq_1", "seq_2"]}, + ) + assert result == {"request_id": "dis_1", "discarded_count": 2} + + def test_discard_all(self, ee_dlq, mock_success): + with mock_success( + json_payloads=[{"request_id": "dis_2", "discarded_count": 9}] + ) as mock_post: + ee_dlq.discard_all() + mock_post.assert_called_once_with( + "POST", f"{ee_dlq.endpoint}/discard", json={"mode": "all"} + ) + + def test_discard_empty_ids_raises(self, ee_dlq): + with pytest.raises(ValueError, match="must be non-empty"): + ee_dlq.discard([]) + + +class TestDeprecatedInherited: + def test_consume_warns_and_delegates_to_list(self, ee_dlq, mock_success): + with mock_success(json_payloads=[[{"message_id": "seq_1"}]]) as mock_get: + with pytest.warns(DeprecationWarning, match="use DLQ.list"): + result = ee_dlq.consume(batch_size=25) + + # Hits /dlq/list, not /dlq/consume. + mock_get.assert_called_once_with( + "GET", f"{ee_dlq.endpoint}/list", params={"batch_size": 25} + ) + assert result == [{"message_id": "seq_1"}] + + def test_purge_warns_and_hits_purge_endpoint(self, ee_dlq, mock_success): + with mock_success() as mock_post: + with pytest.warns(DeprecationWarning, match="use DLQ.discard_all"): + ee_dlq.purge() + + mock_post.assert_called_once_with("POST", f"{ee_dlq.endpoint}/purge") + + +class TestEntitlement: + def test_forbidden_maps_to_feature_not_licensed(self, ee_dlq): + mock_response = mock_responses.create_mock_response_factory()( + status_code=403, json_data={"message": "Forbidden"} + ) + with patch( + "httpx.Client.request", + side_effect=mock_response.raise_for_status.side_effect, + ): + with pytest.raises(errors.FeatureNotLicensedError) as exc_info: + ee_dlq.reprocess_all() + + assert "Enterprise" in str(exc_info.value) + # Still catchable as a ForbiddenError by existing 403 handling. + assert isinstance(exc_info.value, errors.ForbiddenError) From 9f18cae6d879a8e38da8f1907abfa22969b1a92f Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Fri, 12 Jun 2026 15:30:14 +0200 Subject: [PATCH 04/16] feat(ee): surface 409 on reprocess as PipelineNotRunningError Reprocess replays messages through the running pipeline, so the API returns 409 Conflict when the pipeline is not in the Running state. Map it to a clear, catchable error instead of a generic APIError: - Add ConflictError (409) to the base client error mapping. - Add PipelineNotRunningError(ConflictError); reprocess/reprocess_all raise it on 409. Discard has no Running-state constraint, so its 409s stay ConflictError. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/glassflow/ee/dlq.py | 18 ++++++++++++++++-- src/glassflow/etl/api_client.py | 2 ++ src/glassflow/etl/errors.py | 11 +++++++++++ tests/test_ee_dlq.py | 33 +++++++++++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 2 deletions(-) diff --git a/src/glassflow/ee/dlq.py b/src/glassflow/ee/dlq.py index 553f5da..bbf53c0 100644 --- a/src/glassflow/ee/dlq.py +++ b/src/glassflow/ee/dlq.py @@ -79,8 +79,8 @@ def reprocess(self, message_ids: List[str]) -> Dict[str, Any]: Raises: ValueError: If ``message_ids`` is empty or too large. FeatureNotLicensedError: If the backend is not licensed for this. - APIError: If the API request fails (e.g. 409 if the pipeline is not - Running). + PipelineNotRunningError: If the pipeline is not in the Running state. + APIError: If the API request fails. """ return self._action("reprocess", "selected", message_ids) @@ -168,3 +168,17 @@ def _action( message=(f"DLQ {action} requires a GlassFlow Enterprise license"), response=e.response, ) from e + except errors.ConflictError as e: + # Reprocess replays through the running pipeline, so the API rejects + # it with 409 when the pipeline is not in the Running state. Discard + # acts on the queue directly and has no such constraint. + if action == "reprocess": + raise errors.PipelineNotRunningError( + status_code=e.status_code, + message=( + "Pipeline must be in the Running state to reprocess DLQ " + "messages" + ), + response=e.response, + ) from e + raise diff --git a/src/glassflow/etl/api_client.py b/src/glassflow/etl/api_client.py index 221f6ce..701a874 100644 --- a/src/glassflow/etl/api_client.py +++ b/src/glassflow/etl/api_client.py @@ -113,6 +113,8 @@ def _raise_api_error(response: httpx.Response) -> None: raise errors.ForbiddenError(status_code, message, response=response) elif status_code == 404: raise errors.NotFoundError(status_code, message, response=response) + elif status_code == 409: + raise errors.ConflictError(status_code, message, response=response) elif status_code == 422: raise errors.UnprocessableContentError( status_code, message, response=response diff --git a/src/glassflow/etl/errors.py b/src/glassflow/etl/errors.py index 5292103..b903440 100644 --- a/src/glassflow/etl/errors.py +++ b/src/glassflow/etl/errors.py @@ -40,6 +40,17 @@ class FeatureNotLicensedError(ForbiddenError): ForbiddenError so existing 403 handling still catches it.""" +class ConflictError(APIError): + """Raised on 409 Conflict errors.""" + + +class PipelineNotRunningError(ConflictError): + """Raised when an operation requires a Running pipeline but the pipeline is + in another state (the API responds 409). For example, DLQ reprocessing + replays messages through the running pipeline and is rejected when the + pipeline is stopped, terminated, or failed.""" + + class UnprocessableContentError(APIError): """Raised on 422 Unprocessable Content errors.""" diff --git a/tests/test_ee_dlq.py b/tests/test_ee_dlq.py index 591894d..250430a 100644 --- a/tests/test_ee_dlq.py +++ b/tests/test_ee_dlq.py @@ -180,3 +180,36 @@ def test_forbidden_maps_to_feature_not_licensed(self, ee_dlq): assert "Enterprise" in str(exc_info.value) # Still catchable as a ForbiddenError by existing 403 handling. assert isinstance(exc_info.value, errors.ForbiddenError) + + +class TestPipelineState: + def _conflict_patch(self): + mock_response = mock_responses.create_mock_response_factory()( + status_code=409, json_data={"message": "pipeline is not running"} + ) + return patch( + "httpx.Client.request", + side_effect=mock_response.raise_for_status.side_effect, + ) + + def test_reprocess_on_non_running_raises_pipeline_not_running(self, ee_dlq): + with self._conflict_patch(): + with pytest.raises(errors.PipelineNotRunningError) as exc_info: + ee_dlq.reprocess(["seq_1"]) + + assert "Running" in str(exc_info.value) + # Still catchable as the generic 409 ConflictError. + assert isinstance(exc_info.value, errors.ConflictError) + + def test_reprocess_all_on_non_running_raises_pipeline_not_running(self, ee_dlq): + with self._conflict_patch(): + with pytest.raises(errors.PipelineNotRunningError): + ee_dlq.reprocess_all() + + def test_discard_409_stays_conflict_error(self, ee_dlq): + # Discard has no Running-state constraint, so a 409 is not remapped. + with self._conflict_patch(): + with pytest.raises(errors.ConflictError) as exc_info: + ee_dlq.discard_all() + + assert not isinstance(exc_info.value, errors.PipelineNotRunningError) From b3c5f95b1122b038e35a46bb35d15fac4e8bf0e7 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Fri, 12 Jun 2026 15:39:53 +0200 Subject: [PATCH 05/16] docs(readme): document the Enterprise client and DLQ message processing Add an Enterprise Edition section covering the drop-in `glassflow.ee` client and DLQ management (list / reprocess / discard), including the FeatureNotLicensedError and PipelineNotRunningError behaviors and the 1000-id limit. Add a Features bullet for discoverability. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/README.md b/README.md index 2b3bea5..4feac2d 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ A Python SDK for creating and managing data pipelines between Kafka and ClickHou - Pipeline configuration via YAML or JSON - Schema validation and configuration management - Fine-grained resource control per pipeline component +- Enterprise Edition client (`glassflow.ee`) with DLQ reprocessing and discard ## Installation @@ -157,6 +158,44 @@ client.delete_pipeline("my-pipeline-id") pipeline.delete() ``` +## Enterprise Edition + +The GlassFlow Enterprise Edition adds capabilities on top of the Open Source engine. The SDK exposes them through a drop-in client that extends the Open Source one. Import `Client` from `glassflow.ee` instead of `glassflow.etl`: + +```python +from glassflow.ee import Client + +client = Client(host="your-glassflow-etl-url") +``` + +The Enterprise client does everything the Open Source client does, plus the Enterprise-only features below. Entitlement is enforced by the backend: calling an Enterprise-only operation against a backend that is not licensed for it raises `FeatureNotLicensedError`. + +### DLQ message processing + +When a pipeline component fails to process a message, that message lands in the pipeline's dead-letter queue (DLQ). On the Enterprise client, `pipeline.dlq` adds message management on top of the Open Source `state`, `consume`, and `purge`: + +- `list(batch_size, cursor)`: non-destructive paginated read. Each message includes a stable `message_id`, plus its `source` and `received_at`. +- `reprocess(message_ids)` / `reprocess_all()`: move messages back into the pipeline input to be processed again. +- `discard(message_ids)` / `discard_all()`: permanently remove messages. + +```python +pipeline = client.get_pipeline("my-pipeline-id") + +# Inspect failed messages +messages = pipeline.dlq.list(batch_size=50) +ids = [m["message_id"] for m in messages] + +# Retry them after fixing the underlying issue +pipeline.dlq.reprocess(ids) # or pipeline.dlq.reprocess_all() + +# Or drop the ones you do not want +pipeline.dlq.discard(["seq_200"]) # or pipeline.dlq.discard_all() +``` + +Reprocessing replays messages through the running pipeline, so the pipeline must be in the `Running` state. Calling `reprocess` on a stopped, terminated, or failed pipeline raises `PipelineNotRunningError`. Discard acts on the queue directly and works in any state. + +`reprocess` and `discard` accept at most 1000 `message_id` values per call. For larger sets, use the `*_all` variants. See the [DLQ documentation](https://docs.glassflow.dev/configuration/dlq) for the full reference. + ## Migrating from V2 to V3 Pipeline version `v2` has been removed. Use `Client.migrate_pipeline_v2_to_v3()` to convert an existing configuration automatically: From f04afa8eca2986916e9b5f7fa61ac3bd943cbfcb Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Mon, 15 Jun 2026 17:11:08 +0200 Subject: [PATCH 06/16] feat(ee): add DLQ component filter, list pagination, and list_iter Align the EE DLQ list endpoint with glassflow-etl-ee#35 (ETL-1200) and make iterating messages ergonomic: - list() gains a `component` filter (ingestor/join/sink/dedup/oltp-receiver) and now returns the paginated envelope {messages, next_cursor, has_more} instead of a bare list; batch_size max raised to the backend's 1000. - Add list_iter(), a lazy generator that pages via the cursor and yields individual messages, so callers do not manage the cursor by hand. - consume() (deprecated) unwraps the new envelope to keep its legacy list shape. Update tests and the README accordingly. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 8 ++-- src/glassflow/ee/dlq.py | 73 +++++++++++++++++++++++++++-------- tests/test_ee_dlq.py | 85 +++++++++++++++++++++++++++++++++-------- 3 files changed, 132 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 4feac2d..18a7f94 100644 --- a/README.md +++ b/README.md @@ -174,16 +174,16 @@ The Enterprise client does everything the Open Source client does, plus the Ente When a pipeline component fails to process a message, that message lands in the pipeline's dead-letter queue (DLQ). On the Enterprise client, `pipeline.dlq` adds message management on top of the Open Source `state`, `consume`, and `purge`: -- `list(batch_size, cursor)`: non-destructive paginated read. Each message includes a stable `message_id`, plus its `source` and `received_at`. +- `list(batch_size, cursor, component)`: non-destructive paginated read. Returns a page dict with `messages` (each carrying a stable `message_id`, `component`, `error`, `original_message`, and `received_at`), `has_more`, and `next_cursor`. Pass `component` to filter to a single component (`ingestor`, `join`, `sink`, `dedup`, `oltp-receiver`), and pass `next_cursor` back as `cursor` to page. +- `list_iter(batch_size, component)`: lazily iterate over every message, paging via the cursor for you. Yields individual messages, so you do not manage the cursor by hand. - `reprocess(message_ids)` / `reprocess_all()`: move messages back into the pipeline input to be processed again. - `discard(message_ids)` / `discard_all()`: permanently remove messages. ```python pipeline = client.get_pipeline("my-pipeline-id") -# Inspect failed messages -messages = pipeline.dlq.list(batch_size=50) -ids = [m["message_id"] for m in messages] +# Inspect failed messages from the sink only (paged automatically) +ids = [m["message_id"] for m in pipeline.dlq.list_iter(component="sink")] # Retry them after fixing the underlying issue pipeline.dlq.reprocess(ids) # or pipeline.dlq.reprocess_all() diff --git a/src/glassflow/ee/dlq.py b/src/glassflow/ee/dlq.py index bbf53c0..9110a06 100644 --- a/src/glassflow/ee/dlq.py +++ b/src/glassflow/ee/dlq.py @@ -1,7 +1,7 @@ from __future__ import annotations import warnings -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Iterator, List, Optional from glassflow.etl import errors from glassflow.etl.dlq import DLQ as _OSSDLQ @@ -25,46 +25,89 @@ class DLQ(_OSSDLQ): """ def list( - self, batch_size: int = 100, cursor: Optional[str] = None - ) -> List[Dict[str, Any]]: + self, + batch_size: int = 100, + cursor: Optional[str] = None, + component: Optional[str] = None, + ) -> Dict[str, Any]: """Read messages from the DLQ without removing them. - The non-destructive successor to :meth:`consume`. Each message carries a - stable ``message_id`` (NATS sequence number, as a string) plus - ``source``, ``component``, ``error``, ``original_message`` and + The non-destructive successor to :meth:`consume`. Each message in the + returned page carries a stable ``message_id`` (NATS sequence number, as + a string) plus ``component``, ``error``, ``original_message`` and ``received_at``. The ``message_id`` values are what :meth:`reprocess` and :meth:`discard` act on in ``mode=selected``. Args: - batch_size: Number of messages to read (between 1 and 100). - cursor: Opaque pagination cursor from a previous page; omit for the - first page. + batch_size: Number of messages per page (between 1 and 1000). + cursor: NATS sequence to resume from, taken from the previous page's + ``next_cursor``; omit for the first page. + component: Filter to messages from a single data-plane component, + one of ``ingestor``, ``join``, ``sink``, ``dedup``, + ``oltp-receiver``; omit for all components. Returns: - List of DLQ message dicts. + A dict with ``messages`` (list of message dicts), ``has_more`` + (bool), and ``next_cursor`` (str, present when ``has_more`` is + true; pass it back as ``cursor`` to fetch the next page). Raises: ValueError: If ``batch_size`` is invalid. - APIError: If the API request fails. + APIError: If the API request fails (e.g. an unknown ``component``). """ if ( not isinstance(batch_size, int) or batch_size < 1 - or batch_size > self._max_batch_size + or batch_size > MAX_SELECTED_MESSAGE_IDS ): raise ValueError( - f"batch_size must be an integer between 1 and {self._max_batch_size}" + f"batch_size must be an integer between 1 and " + f"{MAX_SELECTED_MESSAGE_IDS}" ) params: Dict[str, Any] = {"batch_size": batch_size} if cursor is not None: params["cursor"] = cursor + if component is not None: + params["component"] = component response = self._request("GET", f"{self.endpoint}/list", params=params) if response.status_code == 204 or not response.content: - return [] + return {"messages": [], "has_more": False} return response.json() + def list_iter( + self, + batch_size: int = 100, + component: Optional[str] = None, + cursor: Optional[str] = None, + ) -> Iterator[Dict[str, Any]]: + """Lazily iterate over every DLQ message, paging via the cursor for you. + + The streaming companion to :meth:`list`: it calls :meth:`list` page by + page and yields each message, so callers do not manage the cursor + themselves. Memory stays flat (one page at a time) and it composes with + ``itertools`` (e.g. ``itertools.islice`` for the first N). + + Args: + batch_size: Messages fetched per underlying page request (1-1000). + component: Optional component filter; see :meth:`list`. + cursor: Optional starting cursor (e.g. to resume a previous run); + omit to start at the beginning of the DLQ. + + Yields: + Individual DLQ message dicts. + """ + while True: + page = self.list(batch_size=batch_size, cursor=cursor, component=component) + yield from page.get("messages", []) + if not page.get("has_more"): + return + cursor = page.get("next_cursor") + # Defensive: a truthy has_more without a cursor would loop forever. + if not cursor: + return + def reprocess(self, message_ids: List[str]) -> Dict[str, Any]: """Move specific messages from the DLQ back into the pipeline input. @@ -127,7 +170,7 @@ def consume(self, batch_size: int = 100) -> List[Dict[str, Any]]: DeprecationWarning, stacklevel=2, ) - return self.list(batch_size=batch_size) + return self.list(batch_size=batch_size).get("messages", []) def purge(self) -> None: """Deprecated: use :meth:`discard_all`. diff --git a/tests/test_ee_dlq.py b/tests/test_ee_dlq.py index 250430a..57c3efe 100644 --- a/tests/test_ee_dlq.py +++ b/tests/test_ee_dlq.py @@ -45,16 +45,19 @@ def test_get_pipeline_dlq_is_ee( class TestList: def test_list_success(self, ee_dlq, mock_success): - payload = [ - { - "message_id": "seq_101", - "source": "source-0", - "component": "sink", - "error": "connection refused", - "original_message": "{}", - "received_at": "2026-05-29T14:00:00Z", - } - ] + payload = { + "messages": [ + { + "message_id": "seq_101", + "component": "sink", + "error": "connection refused", + "original_message": "{}", + "received_at": "2026-05-29T14:00:00Z", + } + ], + "next_cursor": "seq_101", + "has_more": True, + } with mock_success(json_payloads=[payload]) as mock_get: result = ee_dlq.list(batch_size=50) @@ -62,29 +65,79 @@ def test_list_success(self, ee_dlq, mock_success): "GET", f"{ee_dlq.endpoint}/list", params={"batch_size": 50} ) assert result == payload + assert result["messages"][0]["message_id"] == "seq_101" def test_list_with_cursor(self, ee_dlq, mock_success): - with mock_success(json_payloads=[[]]) as mock_get: + with mock_success(json_payloads=[{"messages": [], "has_more": False}]) as m: ee_dlq.list(batch_size=10, cursor="seq_200") - mock_get.assert_called_once_with( + m.assert_called_once_with( "GET", f"{ee_dlq.endpoint}/list", params={"batch_size": 10, "cursor": "seq_200"}, ) + def test_list_with_component_filter(self, ee_dlq, mock_success): + with mock_success(json_payloads=[{"messages": [], "has_more": False}]) as m: + ee_dlq.list(batch_size=10, component="sink") + m.assert_called_once_with( + "GET", + f"{ee_dlq.endpoint}/list", + params={"batch_size": 10, "component": "sink"}, + ) + def test_list_empty_on_204(self, ee_dlq): mock_response = mock_responses.create_mock_response_factory()( status_code=204, json_data=None ) with patch("httpx.Client.request", return_value=mock_response): - assert ee_dlq.list() == [] + assert ee_dlq.list() == {"messages": [], "has_more": False} - @pytest.mark.parametrize("bad", [0, 101, -1, "10"]) + @pytest.mark.parametrize("bad", [0, 1001, -1, "10"]) def test_list_invalid_batch_size(self, ee_dlq, bad): with pytest.raises(ValueError, match="batch_size must be an integer"): ee_dlq.list(batch_size=bad) +class TestListIter: + def test_pages_through_and_advances_cursor(self, ee_dlq, mock_success): + page1 = { + "messages": [{"message_id": "1"}, {"message_id": "2"}], + "next_cursor": "2", + "has_more": True, + } + page2 = {"messages": [{"message_id": "3"}], "has_more": False} + with mock_success(json_payloads=[page1, page2]) as mock_get: + ids = [m["message_id"] for m in ee_dlq.list_iter(batch_size=2)] + + assert ids == ["1", "2", "3"] + assert mock_get.call_count == 2 + # Second page resumes from the first page's next_cursor. + assert mock_get.call_args_list[1].kwargs["params"] == { + "batch_size": 2, + "cursor": "2", + } + + def test_forwards_component_and_is_lazy(self, ee_dlq, mock_success): + page = {"messages": [{"message_id": "1"}], "has_more": False} + with mock_success(json_payloads=[page]) as mock_get: + it = ee_dlq.list_iter(component="sink") + # Generator is lazy: no request until first iteration. + assert mock_get.call_count == 0 + next(it) + assert mock_get.call_args_list[0].kwargs["params"] == { + "batch_size": 100, + "component": "sink", + } + + def test_stops_when_has_more_without_cursor(self, ee_dlq, mock_success): + # Defensive guard: truthy has_more but no next_cursor must not loop. + page = {"messages": [{"message_id": "1"}], "has_more": True} + with mock_success(json_payloads=[page]) as mock_get: + ids = [m["message_id"] for m in ee_dlq.list_iter()] + assert ids == ["1"] + assert mock_get.call_count == 1 + + class TestReprocess: def test_reprocess_selected(self, ee_dlq, mock_success): with mock_success( @@ -147,7 +200,8 @@ def test_discard_empty_ids_raises(self, ee_dlq): class TestDeprecatedInherited: def test_consume_warns_and_delegates_to_list(self, ee_dlq, mock_success): - with mock_success(json_payloads=[[{"message_id": "seq_1"}]]) as mock_get: + envelope = {"messages": [{"message_id": "seq_1"}], "has_more": False} + with mock_success(json_payloads=[envelope]) as mock_get: with pytest.warns(DeprecationWarning, match="use DLQ.list"): result = ee_dlq.consume(batch_size=25) @@ -155,6 +209,7 @@ def test_consume_warns_and_delegates_to_list(self, ee_dlq, mock_success): mock_get.assert_called_once_with( "GET", f"{ee_dlq.endpoint}/list", params={"batch_size": 25} ) + # consume() unwraps the envelope to the legacy list shape. assert result == [{"message_id": "seq_1"}] def test_purge_warns_and_hits_purge_endpoint(self, ee_dlq, mock_success): From 2e7ed6d4cd082a2d8867c6f201061cf1cda9e4d3 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Mon, 15 Jun 2026 18:19:18 +0200 Subject: [PATCH 07/16] feat(ee): validate DLQ list component filter client-side Add DLQ_COMPONENTS and reject an unknown `component` in list() with a ValueError before the request, instead of relying only on the server's 422. Verified against the live staging EE API (glassflow-etl-ee#35): valid components accepted, unknown rejected, and reprocess on a stopped pipeline returns 409 -> PipelineNotRunningError as mapped. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/glassflow/ee/dlq.py | 17 ++++++++++++----- tests/test_ee_dlq.py | 15 ++++++++++++--- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/glassflow/ee/dlq.py b/src/glassflow/ee/dlq.py index 9110a06..b6aa3a3 100644 --- a/src/glassflow/ee/dlq.py +++ b/src/glassflow/ee/dlq.py @@ -11,6 +11,10 @@ # fast, offline error instead of a round-trip 400. MAX_SELECTED_MESSAGE_IDS = 1000 +# Data-plane components a DLQ message can come from (DataPlaneRoles in +# glassflow-etl-ee). Used to validate the list() component filter client-side. +DLQ_COMPONENTS = ("ingestor", "join", "sink", "dedup", "oltp-receiver") + class DLQ(_OSSDLQ): """Enterprise Dead Letter Queue client. @@ -42,9 +46,9 @@ def list( batch_size: Number of messages per page (between 1 and 1000). cursor: NATS sequence to resume from, taken from the previous page's ``next_cursor``; omit for the first page. - component: Filter to messages from a single data-plane component, - one of ``ingestor``, ``join``, ``sink``, ``dedup``, - ``oltp-receiver``; omit for all components. + component: Filter to messages from a single data-plane component; + must be one of :data:`DLQ_COMPONENTS` (``ingestor``, ``join``, + ``sink``, ``dedup``, ``oltp-receiver``); omit for all components. Returns: A dict with ``messages`` (list of message dicts), ``has_more`` @@ -52,8 +56,8 @@ def list( true; pass it back as ``cursor`` to fetch the next page). Raises: - ValueError: If ``batch_size`` is invalid. - APIError: If the API request fails (e.g. an unknown ``component``). + ValueError: If ``batch_size`` or ``component`` is invalid. + APIError: If the API request fails. """ if ( not isinstance(batch_size, int) @@ -65,6 +69,9 @@ def list( f"{MAX_SELECTED_MESSAGE_IDS}" ) + if component is not None and component not in DLQ_COMPONENTS: + raise ValueError(f"component must be one of {', '.join(DLQ_COMPONENTS)}") + params: Dict[str, Any] = {"batch_size": batch_size} if cursor is not None: params["cursor"] = cursor diff --git a/tests/test_ee_dlq.py b/tests/test_ee_dlq.py index 57c3efe..8d53dc6 100644 --- a/tests/test_ee_dlq.py +++ b/tests/test_ee_dlq.py @@ -76,15 +76,24 @@ def test_list_with_cursor(self, ee_dlq, mock_success): params={"batch_size": 10, "cursor": "seq_200"}, ) - def test_list_with_component_filter(self, ee_dlq, mock_success): + @pytest.mark.parametrize( + "component", ["ingestor", "join", "sink", "dedup", "oltp-receiver"] + ) + def test_list_with_valid_component_filter(self, ee_dlq, mock_success, component): with mock_success(json_payloads=[{"messages": [], "has_more": False}]) as m: - ee_dlq.list(batch_size=10, component="sink") + ee_dlq.list(batch_size=10, component=component) m.assert_called_once_with( "GET", f"{ee_dlq.endpoint}/list", - params={"batch_size": 10, "component": "sink"}, + params={"batch_size": 10, "component": component}, ) + @pytest.mark.parametrize("bad", ["otlp-receiver", "Sink", "", "transform"]) + def test_list_invalid_component_raises_client_side(self, ee_dlq, bad): + # Validated client-side before any HTTP call (no mock needed). + with pytest.raises(ValueError, match="component must be one of"): + ee_dlq.list(component=bad) + def test_list_empty_on_204(self, ee_dlq): mock_response = mock_responses.create_mock_response_factory()( status_code=204, json_data=None From 75f090897a003d303255eb71b8d2fbb9b0e587c7 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 15 Jun 2026 16:40:05 +0000 Subject: [PATCH 08/16] docs: update coverage badge --- README.md | 2 +- coverage.xml | 131 +++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 108 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 18a7f94..e99ffdb 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ - +

diff --git a/coverage.xml b/coverage.xml index bb216f4..c8c1e81 100644 --- a/coverage.xml +++ b/coverage.xml @@ -1,5 +1,5 @@ - + @@ -7,15 +7,16 @@ /home/runner/work/glassflow-python-sdk/glassflow-python-sdk/src/glassflow - + - - + + + @@ -28,17 +29,94 @@
- + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
- + @@ -50,7 +128,7 @@ - + @@ -112,11 +190,13 @@ - - - - + + + + + + @@ -233,21 +313,24 @@ - - - - - - - - - + + + + + + + + - - - - + + + + + + + + From 5ee4afd0ec32d124659625e7be9564137505fa99 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Tue, 16 Jun 2026 14:11:34 +0200 Subject: [PATCH 09/16] feat(models): add Avro and Protobuf format support to Kafka source (ETL-1197) Add `format` (json/avro/protobuf) and a unified `schema` to the Kafka source, matching the agreed backend contract. - KafkaSchema groups all schema config: `fields` (json), `avsc` (Avro record), `proto` + `message` (protobuf), and read-only `parsed_fields` (returned by the backend on GET, never sent back and not used in SDK logic). AvroSchema enforces the structure GlassFlow needs to map to ClickHouse: a `record` with a `name` and non-empty `fields`; extra Avro keys are preserved. Full Avro correctness is left to the backend. - Compatibility across editions: input accepts both a top-level `schema_fields` (Open Source, and Enterprise for POST compat) and the unified `schema` object (Enterprise). On output, json serializes to top-level `schema_fields` (works on both editions) and avro/protobuf to the `schema` object. A `schema_fields` read property keeps the dedup/join validators and existing callers working. - Replace the speculative tagged-format foundation (formats.py + the format registry); the source-type registry is unchanged. Surface backend validation detail: APIError now carries the structured `details`, and the create/edit path composes details.error (e.g. a proto/avro compilation error) into PipelineInvalidConfigurationError. avro/protobuf are Enterprise features enforced by the backend; the SDK models them on the shared OSS source so OSS clients can also round-trip such configs. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/glassflow/etl/api_client.py | 47 ++- src/glassflow/etl/errors.py | 6 +- src/glassflow/etl/models/__init__.py | 10 +- src/glassflow/etl/models/registry.py | 24 +- src/glassflow/etl/models/sources/__init__.py | 10 +- src/glassflow/etl/models/sources/formats.py | 46 --- src/glassflow/etl/models/sources/kafka.py | 180 +++++++++-- src/glassflow/etl/pipeline.py | 10 +- tests/test_models/test_source_formats.py | 313 +++++++++++-------- 9 files changed, 399 insertions(+), 247 deletions(-) delete mode 100644 src/glassflow/etl/models/sources/formats.py diff --git a/src/glassflow/etl/api_client.py b/src/glassflow/etl/api_client.py index 701a874..362dfcc 100644 --- a/src/glassflow/etl/api_client.py +++ b/src/glassflow/etl/api_client.py @@ -67,66 +67,81 @@ def _raise_api_error(response: httpx.Response) -> None: error_data = response.json() message = error_data.get("message", None) code = error_data.get("code", None) + details = error_data.get("details", None) except json.JSONDecodeError: message = f"{status_code} {response.reason_phrase}" code = None + details = None error_data = {} if status_code == 400: # Handle specific status validation error codes if code == "TERMINAL_STATE_VIOLATION": raise errors.TerminalStateViolationError( - status_code, message, response=response + status_code, message, response=response, details=details ) elif code == "INVALID_STATUS_TRANSITION": raise errors.InvalidStatusTransitionError( - status_code, - message, - response=response, + status_code, message, response=response, details=details ) elif code == "UNKNOWN_STATUS": - raise errors.UnknownStatusError(status_code, message, response=response) + raise errors.UnknownStatusError( + status_code, message, response=response, details=details + ) elif code == "PIPELINE_ALREADY_IN_STATE": raise errors.PipelineAlreadyInStateError( - status_code, message, response=response + status_code, message, response=response, details=details ) elif code == "PIPELINE_IN_TRANSITION": raise errors.PipelineInTransitionError( - status_code, message, response=response + status_code, message, response=response, details=details ) elif message and message.startswith("invalid json:"): - raise errors.InvalidJsonError(status_code, message, response=response) + raise errors.InvalidJsonError( + status_code, message, response=response, details=details + ) elif message and message == "pipeline id cannot be empty": raise errors.EmptyPipelineIdError( - status_code, message, response=response + status_code, message, response=response, details=details ) elif message and message.startswith( "pipeline can only be deleted if it's stopped or terminated" ): raise errors.PipelineDeletionStateViolationError( - status_code, message, response=response + status_code, message, response=response, details=details ) else: # Generic 400 error for unknown codes - raise errors.ValidationError(status_code, message, response=response) + raise errors.ValidationError( + status_code, message, response=response, details=details + ) elif status_code == 403: - raise errors.ForbiddenError(status_code, message, response=response) + raise errors.ForbiddenError( + status_code, message, response=response, details=details + ) elif status_code == 404: - raise errors.NotFoundError(status_code, message, response=response) + raise errors.NotFoundError( + status_code, message, response=response, details=details + ) elif status_code == 409: - raise errors.ConflictError(status_code, message, response=response) + raise errors.ConflictError( + status_code, message, response=response, details=details + ) elif status_code == 422: raise errors.UnprocessableContentError( - status_code, message, response=response + status_code, message, response=response, details=details ) elif status_code == 500: - raise errors.ServerError(status_code, message, response=response) + raise errors.ServerError( + status_code, message, response=response, details=details + ) else: raise errors.APIError( status_code, message="An error occurred: " f"({status_code} {response.reason_phrase}) {message}", response=response, + details=details, ) def _track_event(self, event_name: str, **kwargs: Any) -> None: diff --git a/src/glassflow/etl/errors.py b/src/glassflow/etl/errors.py index b903440..b92f015 100644 --- a/src/glassflow/etl/errors.py +++ b/src/glassflow/etl/errors.py @@ -15,10 +15,14 @@ class ConnectionError(RequestError): class APIError(GlassFlowError): """Base for API response errors.""" - def __init__(self, status_code, message=None, response=None): + def __init__(self, status_code, message=None, response=None, details=None): self.status_code = status_code self.response = response self.message = message + # The API's structured ``details`` object, when present. For invalid + # configs it carries the specific cause under ``details["error"]`` (for + # example an Avro/Protobuf schema compilation error). + self.details = details or {} super().__init__(self.message) diff --git a/src/glassflow/etl/models/__init__.py b/src/glassflow/etl/models/__init__.py index 54df304..194a583 100644 --- a/src/glassflow/etl/models/__init__.py +++ b/src/glassflow/etl/models/__init__.py @@ -27,13 +27,15 @@ from .source import SourceBaseConfig, SourceBaseConfigPatch, SourceType from .sources import ( AnySource, + AvroSchema, ConsumerGroupOffset, - JsonFormat, KafkaConnectionParams, KafkaConnectionParamsPatch, KafkaField, + KafkaFormat, KafkaMechanism, KafkaProtocol, + KafkaSchema, KafkaSource, KafkaSourcePatch, OTLPLogsSource, @@ -42,7 +44,6 @@ OTLPTracesSource, SchemaRegistry, SourceConfig, - SourceFormat, ) from .transforms import ( DedupTransform, @@ -64,6 +65,7 @@ __all__ = [ "AnySource", + "AvroSchema", "ClickhouseConnectionParams", "ClickhouseConnectionParamsPatch", "ClickhouseDataType", @@ -77,15 +79,16 @@ "JoinConfig", "JoinConfigPatch", "JoinOutputField", - "JsonFormat", "JoinSourceConfig", "JoinType", "KafkaConnectionParams", "KafkaConnectionParamsPatch", "KafkaDataType", "KafkaField", + "KafkaFormat", "KafkaMechanism", "KafkaProtocol", + "KafkaSchema", "KafkaSource", "KafkaSourcePatch", "MetadataConfig", @@ -109,7 +112,6 @@ "SourceBaseConfig", "SourceBaseConfigPatch", "SourceConfig", - "SourceFormat", "SourceResourceEntry", "SourceType", "StatelessTransform", diff --git a/src/glassflow/etl/models/registry.py b/src/glassflow/etl/models/registry.py index 14b436c..7ed65da 100644 --- a/src/glassflow/etl/models/registry.py +++ b/src/glassflow/etl/models/registry.py @@ -1,17 +1,15 @@ -"""Extensible registries for source types and source formats. +"""Extensible registry for source types. -Source types (``kafka``, ``otlp.logs``, ...) and source formats (``json``, -and Enterprise ``avro``/``protobuf``) are open-ended: the Enterprise SDK adds +Source types (``kafka``, ``otlp.logs``, ...) are open-ended: an edition can add new ones without modifying this package. Instead of a static Pydantic discriminated union (whose members are frozen at class-definition time), models -annotate their fields with the base type and dispatch each value to the -concrete class by its ``type`` string at validation time, looked up here. +annotate their field with the base type and dispatch each value to the concrete +class by its ``type`` string at validation time, looked up here. Editions register their classes at import:: - from glassflow.etl.models.registry import register_source, register_format + from glassflow.etl.models.registry import register_source register_source(KinesisSource) - register_format(AvroFormat) """ from __future__ import annotations @@ -23,7 +21,6 @@ _T = TypeVar("_T", bound=BaseModel) _SOURCE_CLASSES: Dict[str, Type[BaseModel]] = {} -_FORMAT_CLASSES: Dict[str, Type[BaseModel]] = {} def _type_key(cls: Type[BaseModel]) -> str: @@ -44,13 +41,6 @@ def register_source(cls: Type[_T]) -> Type[_T]: return cls -def register_format(cls: Type[_T]) -> Type[_T]: - """Register a source format class, keyed by its ``type`` default. Usable as - a decorator.""" - _FORMAT_CLASSES[_type_key(cls)] = cls - return cls - - def _resolve(value: Any, registry: Dict[str, Type[BaseModel]], kind: str) -> Any: """Coerce a raw dict to the registered concrete model by its ``type``. @@ -69,7 +59,3 @@ def _resolve(value: Any, registry: Dict[str, Type[BaseModel]], kind: str) -> Any def resolve_source(value: Any) -> Any: return _resolve(value, _SOURCE_CLASSES, "source") - - -def resolve_format(value: Any) -> Any: - return _resolve(value, _FORMAT_CLASSES, "format") diff --git a/src/glassflow/etl/models/sources/__init__.py b/src/glassflow/etl/models/sources/__init__.py index 6a4b2f3..8b62a29 100644 --- a/src/glassflow/etl/models/sources/__init__.py +++ b/src/glassflow/etl/models/sources/__init__.py @@ -16,14 +16,16 @@ from ..registry import register_source from ..source import SourceBaseConfig, SourceBaseConfigPatch, SourceType -from .formats import JsonFormat, SourceFormat from .kafka import ( + AvroSchema, ConsumerGroupOffset, KafkaConnectionParams, KafkaConnectionParamsPatch, KafkaField, + KafkaFormat, KafkaMechanism, KafkaProtocol, + KafkaSchema, KafkaSource, KafkaSourcePatch, SchemaRegistry, @@ -66,12 +68,15 @@ "SourceBaseConfig", "SourceBaseConfigPatch", # Kafka + "AvroSchema", "ConsumerGroupOffset", "KafkaConnectionParams", "KafkaConnectionParamsPatch", "KafkaField", + "KafkaFormat", "KafkaMechanism", "KafkaProtocol", + "KafkaSchema", "KafkaSource", "KafkaSourcePatch", "SchemaRegistry", @@ -81,9 +86,6 @@ "OTLPSource", "OTLPSourcePatch", "OTLPTracesSource", - # Formats - "SourceFormat", - "JsonFormat", # Union "AnySource", "SourceConfig", diff --git a/src/glassflow/etl/models/sources/formats.py b/src/glassflow/etl/models/sources/formats.py deleted file mode 100644 index aed8c28..0000000 --- a/src/glassflow/etl/models/sources/formats.py +++ /dev/null @@ -1,46 +0,0 @@ -"""Source format models. - -A source's ``format`` describes how its payload is encoded and how the schema -is supplied. It is a tagged object discriminated by ``type`` (``json`` in OSS; -``avro``/``protobuf`` in Enterprise), with format-specific configuration nested -under a key named after the type, e.g.:: - - {"type": "protobuf", "protobuf": {"schema_source": "inline", "proto_text": "..."}} - -New formats are added by subclassing :class:`SourceFormat` and registering them -with :func:`glassflow.etl.models.registry.register_format`; no change to this -package is required. -""" - -from __future__ import annotations - -from typing import Literal - -from pydantic import BaseModel - -from ..registry import register_format - - -class SourceFormat(BaseModel): - """Base class for all source formats.""" - - type: str - - def validate_against_registry(self, has_schema_registry: bool) -> None: - """Hook for formats to enforce schema-source rules that depend on the - parent source's schema registry configuration. - - Called by the source's after-validator with whether the source has a - schema registry configured. The base implementation is a no-op; - formats that can read their schema from a registry (e.g. Enterprise - avro/protobuf) override this to require that the registry is present - when ``schema_source`` is ``"registry"``. - """ - return None - - -@register_format -class JsonFormat(SourceFormat): - """JSON-encoded payloads. This is the default and carries no extra config.""" - - type: Literal["json"] = "json" diff --git a/src/glassflow/etl/models/sources/kafka.py b/src/glassflow/etl/models/sources/kafka.py index fec07aa..04ef287 100644 --- a/src/glassflow/etl/models/sources/kafka.py +++ b/src/glassflow/etl/models/sources/kafka.py @@ -1,14 +1,24 @@ """Kafka source models.""" -from typing import Any, List, Literal, Optional +from typing import Any, Dict, List, Literal, Optional -from pydantic import BaseModel, Field, SerializeAsAny, field_validator, model_validator +from pydantic import ( + BaseModel, + ConfigDict, + Field, + model_serializer, + model_validator, +) from ..base import CaseInsensitiveStrEnum from ..data_types import KafkaDataType -from ..registry import resolve_format from ..source import SourceBaseConfig, SourceBaseConfigPatch, SourceType -from .formats import SourceFormat + + +class KafkaFormat(CaseInsensitiveStrEnum): + JSON = "json" + AVRO = "avro" + PROTOBUF = "protobuf" class KafkaProtocol(CaseInsensitiveStrEnum): @@ -46,6 +56,44 @@ class KafkaField(BaseModel): type: KafkaDataType +class AvroSchema(BaseModel): + """The Avro schema record (``schema.avsc``) for an Avro Kafka source. + + GlassFlow maps the record's root-level fields to ClickHouse columns, so the + top-level schema must be an Avro ``record`` with a ``name`` and a non-empty + ``fields`` list. Individual field ``type`` values may themselves be nested + Avro schemas. Other Avro keys (``namespace``, ``doc``, ``aliases``, ...) are + accepted and preserved on round-trip. + """ + + model_config = ConfigDict(extra="allow") + + type: Literal["record"] + name: str + fields: List[Dict[str, Any]] = Field(min_length=1) + namespace: Optional[str] = Field(default=None) + + +class KafkaSchema(BaseModel): + """Unified schema for a Kafka source. The shape used is selected by the + source's :class:`KafkaFormat`: + + - ``json`` -> ``fields`` (GlassFlow field declarations) + - ``avro`` -> ``avsc`` (the Avro schema record) + - ``protobuf`` -> ``proto`` (the ``.proto`` text) and ``message`` (the + message name within it) + + On reads the backend also returns ``parsed_fields``: the field list parsed + from the avsc/proto. It is read-only and is not sent back on create/edit. + """ + + fields: Optional[List[KafkaField]] = Field(default=None) + avsc: Optional[AvroSchema] = Field(default=None) + proto: Optional[str] = Field(default=None) + message: Optional[str] = Field(default=None) + parsed_fields: Optional[List[KafkaField]] = Field(default=None) + + class KafkaConnectionParams(BaseModel): brokers: List[str] protocol: KafkaProtocol @@ -74,6 +122,36 @@ def update(self, patch: "KafkaConnectionParamsPatch") -> "KafkaConnectionParams" return KafkaConnectionParams.model_validate(merged_dict) +def _parse_source_schema(data: Any) -> Any: + """Fold both wire schema shapes into the unified ``source_schema``: + + - top-level ``schema_fields`` (Open Source, and Enterprise for compatibility) + - the unified ``schema`` object (Enterprise: avsc / proto+message / + parsed_fields, and optionally fields) + + Also drops an empty ``schema_registry``. Left untouched if ``source_schema`` + is already supplied directly. + """ + if not isinstance(data, dict) or "source_schema" in data: + return data + data = dict(data) + if data.get("schema_registry", None) == {}: + data.pop("schema_registry", None) + + schema: Dict[str, Any] = {} + if "schema_fields" in data: + schema["fields"] = data.pop("schema_fields") + if "schema" in data: + wire = data.pop("schema") + if isinstance(wire, dict): + schema.update(wire) + else: + data["schema"] = wire # let validation reject a non-object schema + if schema: + data["source_schema"] = schema + return data + + class KafkaSource(SourceBaseConfig): """Kafka source configuration. @@ -81,31 +159,53 @@ class KafkaSource(SourceBaseConfig): and a single topic string. """ + model_config = ConfigDict(populate_by_name=True) + type: Literal[SourceType.KAFKA] = SourceType.KAFKA connection_params: KafkaConnectionParams topic: str consumer_group_initial_offset: ConsumerGroupOffset = ConsumerGroupOffset.LATEST schema_registry: Optional[SchemaRegistry] = Field(default=None) schema_version: Optional[str] = Field(default=None) - schema_fields: Optional[List[KafkaField]] = Field(default=None) - # Payload format. ``None`` means JSON (default) and is omitted from the - # serialized config. Concrete subclass is resolved from the format registry - # by its ``type``; SerializeAsAny preserves subclass-only fields on dump. - format: Optional[SerializeAsAny[SourceFormat]] = Field(default=None) + # Payload wire format. ``None`` means JSON (the backend default) and is + # omitted from the serialized config. ``avro`` and ``protobuf`` are + # Enterprise features and are rejected by an unlicensed backend. + format: Optional[KafkaFormat] = Field(default=None) + # All schema-related config in one place. Serialized back to the wire as + # top-level ``schema_fields`` (json, for Open Source / Enterprise compat) or + # the ``schema`` object (avro/protobuf); see the serializer below. + source_schema: Optional[KafkaSchema] = Field(default=None) @model_validator(mode="before") @classmethod - def validate_empty_schema_registry(cls, data: Any) -> Any: - if isinstance(data, dict): - if data.get("schema_registry", None) == {}: - data.pop("schema_registry", None) + def parse_schema(cls, data: Any) -> Any: + return _parse_source_schema(data) + + @model_serializer(mode="wrap") + def serialize_schema(self, handler: Any) -> Any: + """Emit json fields as top-level ``schema_fields`` (compatible with both + editions) and avro/protobuf as the ``schema`` object. ``parsed_fields`` + is read-only and is not emitted.""" + data = handler(self) + schema = data.pop("source_schema", None) + if schema: + if schema.get("fields") is not None: + data["schema_fields"] = schema["fields"] + inner = { + k: schema[k] + for k in ("avsc", "proto", "message") + if schema.get(k) is not None + } + if inner: + data["schema"] = inner return data - @field_validator("format", mode="before") - @classmethod - def resolve_format_field(cls, value: Any) -> Any: - """Dispatch a raw format dict to the concrete registered format class.""" - return resolve_format(value) + @property + def schema_fields(self) -> Optional[List[KafkaField]]: + """Backward-compatible accessor for the JSON field declarations, held + under ``schema.fields``. (``schema.parsed_fields`` is read-only backend + info and is intentionally not surfaced here.)""" + return self.source_schema.fields if self.source_schema else None @model_validator(mode="after") def validate_schema_registry_requires_version(self) -> "KafkaSource": @@ -117,11 +217,23 @@ def validate_schema_registry_requires_version(self) -> "KafkaSource": return self @model_validator(mode="after") - def validate_format(self) -> "KafkaSource": - """Let the format enforce schema-source rules against this source's - schema registry configuration.""" - if self.format is not None: - self.format.validate_against_registry(self.schema_registry is not None) + def validate_format_schema(self) -> "KafkaSource": + """The schema shape must match the declared format.""" + schema = self.source_schema + if self.format == KafkaFormat.AVRO: + if schema is None or schema.avsc is None: + raise ValueError("avro format requires schema.avsc") + elif self.format == KafkaFormat.PROTOBUF: + if schema is None or not schema.proto or not schema.message: + raise ValueError( + "protobuf format requires schema.proto and schema.message" + ) + elif schema is not None and ( + schema.avsc is not None or schema.proto or schema.message + ): + raise ValueError( + "schema.avsc / proto / message require format 'avro' or 'protobuf'" + ) return self def update(self, patch: "KafkaSourcePatch") -> "KafkaSource": @@ -136,8 +248,11 @@ def update(self, patch: "KafkaSourcePatch") -> "KafkaSource": if patch.topic is not None: update_dict.topic = patch.topic - if patch.schema_fields is not None: - update_dict.schema_fields = patch.schema_fields + if patch.format is not None: + update_dict.format = patch.format + + if patch.source_schema is not None: + update_dict.source_schema = patch.source_schema return update_dict @@ -162,6 +277,19 @@ class KafkaConnectionParamsPatch(BaseModel): class KafkaSourcePatch(SourceBaseConfigPatch): """Patch model for KafkaSource.""" + model_config = ConfigDict(populate_by_name=True) + connection_params: Optional[KafkaConnectionParamsPatch] = Field(default=None) topic: Optional[str] = Field(default=None) - schema_fields: Optional[List[KafkaField]] = Field(default=None) + format: Optional[KafkaFormat] = Field(default=None) + source_schema: Optional[KafkaSchema] = Field(default=None) + + @model_validator(mode="before") + @classmethod + def parse_schema(cls, data: Any) -> Any: + return _parse_source_schema(data) + + @property + def schema_fields(self) -> Optional[List[KafkaField]]: + """Backward-compatible accessor for ``schema.fields``.""" + return self.source_schema.fields if self.source_schema else None diff --git a/src/glassflow/etl/pipeline.py b/src/glassflow/etl/pipeline.py index 9ec4590..ae51586 100644 --- a/src/glassflow/etl/pipeline.py +++ b/src/glassflow/etl/pipeline.py @@ -448,9 +448,17 @@ def _request( ) from e except errors.UnprocessableContentError as e: self._track_event(event_name, error_type="InvalidPipelineConfig") + message = e.message or "Invalid pipeline configuration" + # The specific cause (e.g. an Avro/Protobuf schema compilation error) + # is in details.error; surface it instead of the generic message. + detail = e.details.get("error") if e.details else None + if detail: + message = f"{message}: {detail}" raise errors.PipelineInvalidConfigurationError( status_code=e.status_code, - message=e.message or "Invalid pipeline configuration", + message=message, + response=e.response, + details=e.details, ) from e except errors.APIError as e: self._track_event(event_name, error_type="InternalServerError") diff --git a/tests/test_models/test_source_formats.py b/tests/test_models/test_source_formats.py index add330f..6a70818 100644 --- a/tests/test_models/test_source_formats.py +++ b/tests/test_models/test_source_formats.py @@ -1,170 +1,226 @@ -"""Tests for source formats and the source/format registries. +"""Tests for Kafka source formats (json/avro/protobuf) and the source registry. -These prove the open-ended extension mechanism: an out-of-tree format or source -type (standing in for what the Enterprise SDK ships) plugs in via the registry -with no change to the OSS models, and SerializeAsAny preserves its fields on -dump. +The unified ``source_schema`` holds all schema config. It serializes json to a +top-level ``schema_fields`` (compatible with Open Source and Enterprise) and +avro/protobuf to the ``schema`` object (``avsc`` / ``proto``+``message``). On +reads the backend may also return ``schema.parsed_fields``. """ -from typing import Literal, Optional +from typing import Literal +from unittest.mock import patch import pytest -from pydantic import BaseModel, model_validator -from glassflow.etl import models +from glassflow.etl import errors, models from glassflow.etl.models import registry -from glassflow.etl.models.base import CaseInsensitiveStrEnum -from glassflow.etl.models.sources.formats import SourceFormat - -# --- Out-of-tree EE-like definitions (not part of OSS) ---------------------- +from tests.data import mock_responses + + +def _kafka(**overrides) -> dict: + base = { + "type": "kafka", + "source_id": "events", + "connection_params": {"brokers": ["b:9092"], "protocol": "PLAINTEXT"}, + "topic": "events", + "consumer_group_initial_offset": "earliest", + } + base.update(overrides) + return base + + +AVSC = { + "type": "record", + "name": "Event", + "namespace": "test", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "ts_ms", "type": "long"}, + ], +} +PROTO_TEXT = 'syntax = "proto3";\npackage test;\nmessage Event {\n string id = 1;\n}' -class SchemaSource(CaseInsensitiveStrEnum): - INLINE = "inline" - REGISTRY = "registry" +class TestJsonFormat: + def test_format_defaults_to_none_and_is_omitted(self): + src = models.KafkaSource.model_validate(_kafka()) + assert src.format is None + assert "format" not in src.model_dump(by_alias=True, exclude_none=True) + def test_top_level_schema_fields_round_trips(self): + wire = _kafka( + format="json", + schema_fields=[ + {"name": "id", "type": "string"}, + {"name": "ts_ms", "type": "int"}, + ], + ) + src = models.KafkaSource.model_validate(wire) + assert src.source_schema.fields[0].name == "id" + assert src.schema_fields[0].name == "id" # compat accessor + dumped = src.model_dump(by_alias=True, exclude_none=True) + # json always serializes to top-level schema_fields (OSS + EE compat) + assert dumped["schema_fields"] == wire["schema_fields"] + assert "schema" not in dumped + assert "source_schema" not in dumped -class ProtobufConfig(BaseModel): - schema_source: SchemaSource - proto_text: Optional[str] = None + def test_unified_schema_fields_input_also_accepted(self): + # EE may return json fields under the unified schema object. + src = models.KafkaSource.model_validate( + _kafka(format="json", schema={"fields": [{"name": "id", "type": "string"}]}) + ) + assert src.schema_fields[0].name == "id" + # still emits the compatible top-level form + dumped = src.model_dump(by_alias=True, exclude_none=True) + assert dumped["schema_fields"] == [{"name": "id", "type": "string"}] - @model_validator(mode="after") - def _require_inline_text(self): - if self.schema_source == SchemaSource.INLINE and not self.proto_text: - raise ValueError("proto_text is required when schema_source is 'inline'") - return self +class TestAvroFormat: + def test_avro_round_trips(self): + src = models.KafkaSource.model_validate( + _kafka(format="avro", schema={"avsc": AVSC}) + ) + assert src.format == models.KafkaFormat.AVRO + assert src.source_schema.avsc.name == "Event" + dumped = src.model_dump(by_alias=True, exclude_none=True) + assert dumped["schema"] == {"avsc": AVSC} + assert "schema_fields" not in dumped -class ProtobufFormat(SourceFormat): - type: Literal["protobuf"] = "protobuf" - protobuf: ProtobufConfig + def test_avro_requires_avsc(self): + with pytest.raises(ValueError, match="avro format requires schema.avsc"): + models.KafkaSource.model_validate(_kafka(format="avro")) - def validate_against_registry(self, has_schema_registry: bool) -> None: - if self.protobuf.schema_source == SchemaSource.REGISTRY and not ( - has_schema_registry - ): - raise ValueError( - "protobuf with schema_source 'registry' requires a schema registry " - "on the source" + def test_avsc_must_be_a_record_with_fields(self): + with pytest.raises(ValueError): + models.KafkaSource.model_validate( + _kafka(format="avro", schema={"avsc": {"type": "string", "name": "x"}}) + ) + with pytest.raises(ValueError): + models.KafkaSource.model_validate( + _kafka(format="avro", schema={"avsc": {"type": "record", "name": "E"}}) ) - -class KinesisSource(models.SourceBaseConfig): - type: Literal["kinesis"] = "kinesis" - stream_name: str - region: str - - -@pytest.fixture -def register_ee_types(): - """Register the out-of-tree types and clean up afterward.""" - registry.register_format(ProtobufFormat) - registry.register_source(KinesisSource) - yield - registry._FORMAT_CLASSES.pop("protobuf", None) - registry._SOURCE_CLASSES.pop("kinesis", None) + def test_avsc_preserves_extra_keys(self): + avsc = { + "type": "record", + "name": "Event", + "namespace": "test", + "doc": "an event", + "fields": [{"name": "meta", "type": {"type": "map", "values": "string"}}], + } + src = models.KafkaSource.model_validate( + _kafka(format="avro", schema={"avsc": avsc}) + ) + dumped = src.model_dump(by_alias=True, exclude_none=True) + assert dumped["schema"]["avsc"] == avsc -# The exact payload from the ticket discussion. -PROTOBUF_SOURCE = { - "type": "kafka", - "source_id": "events", - "connection_params": { - "brokers": ["kafka-controller-0.staging-cluster.glassflow.xyz:9094"], - "mechanism": "PLAIN", - "protocol": "SASL_PLAINTEXT", - "username": "glassflow", - "password": "secret", - }, - "topic": "test_proto_events_dedup", - "format": { - "type": "protobuf", - "protobuf": { - "schema_source": "inline", - "proto_text": 'syntax = "proto3";\nmessage Event {\n string id = 1;\n}', - }, - }, - "consumer_group_initial_offset": "earliest", -} +class TestProtobufFormat: + def test_protobuf_round_trips(self): + src = models.KafkaSource.model_validate( + _kafka(format="protobuf", schema={"proto": PROTO_TEXT, "message": "Event"}) + ) + assert src.format == models.KafkaFormat.PROTOBUF + assert src.source_schema.proto == PROTO_TEXT + assert src.source_schema.message == "Event" + dumped = src.model_dump(by_alias=True, exclude_none=True) + assert dumped["schema"] == {"proto": PROTO_TEXT, "message": "Event"} + def test_protobuf_requires_proto_and_message(self): + with pytest.raises(ValueError, match="schema.proto and schema.message"): + models.KafkaSource.model_validate( + _kafka(format="protobuf", schema={"proto": PROTO_TEXT}) + ) -class TestJsonFormat: - """OSS default format behaviour.""" - def test_format_defaults_to_none(self): +class TestParsedFields: + def test_parsed_fields_read_only_not_surfaced_or_emitted(self): + # Shape the backend returns for an avro source on GET. src = models.KafkaSource.model_validate( - { - "type": "kafka", - "source_id": "s1", - "connection_params": {"brokers": ["b:9092"], "protocol": "PLAINTEXT"}, - "topic": "t", - } + _kafka( + format="avro", + schema={ + "avsc": AVSC, + "parsed_fields": [ + {"name": "id", "type": "string"}, + {"name": "ts_ms", "type": "int"}, + ], + }, + ) ) - assert src.format is None - # Omitted from the serialized config so OSS payloads are unchanged. - assert "format" not in src.model_dump(by_alias=True, exclude_none=True) + # Available as informational backend output... + assert src.source_schema.parsed_fields[0].name == "id" + # ...but not surfaced via the schema_fields compat accessor (json only), + assert src.schema_fields is None + # ...and not emitted back on dump. + dumped = src.model_dump(by_alias=True, exclude_none=True) + assert dumped["schema"] == {"avsc": AVSC} + assert "parsed_fields" not in dumped["schema"] - def test_explicit_json_format_roundtrips(self): + def test_parsed_fields_on_json_get(self): + # GET of a json source: schema_fields (or schema.fields) plus parsed_fields. src = models.KafkaSource.model_validate( - { - "type": "kafka", - "source_id": "s1", - "connection_params": {"brokers": ["b:9092"], "protocol": "PLAINTEXT"}, - "topic": "t", - "format": {"type": "json"}, - } + _kafka( + format="json", + schema_fields=[{"name": "id", "type": "string"}], + schema={"parsed_fields": [{"name": "id", "type": "string"}]}, + ) ) - assert isinstance(src.format, models.JsonFormat) + assert src.schema_fields[0].name == "id" + assert src.source_schema.parsed_fields[0].name == "id" dumped = src.model_dump(by_alias=True, exclude_none=True) - assert dumped["format"] == {"type": "json"} + assert dumped["schema_fields"] == [{"name": "id", "type": "string"}] + assert "schema" not in dumped # parsed_fields not echoed back + - def test_unknown_format_raises(self): - with pytest.raises(ValueError, match="Unknown format type 'avro'"): +class TestSchemaFormatConsistency: + def test_avsc_without_avro_format_rejected(self): + with pytest.raises(ValueError, match="require format 'avro' or 'protobuf'"): models.KafkaSource.model_validate( - { - "type": "kafka", - "source_id": "s1", - "connection_params": { - "brokers": ["b:9092"], - "protocol": "PLAINTEXT", - }, - "topic": "t", - "format": {"type": "avro"}, - } + _kafka(format="json", schema={"avsc": AVSC}) ) -class TestRegisteredFormat: - """An out-of-tree format plugs in via the registry.""" +class TestSchemaErrorSurfacing: + """A backend 422 puts the specific cause in details.error; the SDK surfaces + it on the create/edit path instead of the generic message.""" - def test_protobuf_payload_roundtrips(self, register_ee_types): - src = models.KafkaSource.model_validate(PROTOBUF_SOURCE) + def test_invalid_schema_surfaces_backend_detail(self, pipeline, mock_track): + resp = mock_responses.create_mock_response_factory()( + status_code=422, + json_data={ + "status": 422, + "code": "unprocessable_entity", + "message": "failed to convert request to pipeline model", + "details": {"error": 'source "events": proto compilation error: boom'}, + }, + ) + with patch( + "httpx.Client.request", side_effect=resp.raise_for_status.side_effect + ): + with pytest.raises(errors.PipelineInvalidConfigurationError) as exc: + pipeline.create() - assert isinstance(src.format, ProtobufFormat) - assert src.format.protobuf.schema_source == SchemaSource.INLINE + assert "proto compilation error: boom" in str(exc.value) + assert exc.value.details["error"].startswith('source "events"') - # SerializeAsAny preserves the subclass-only nested config on dump, - # round-tripping to the exact wire shape. - dumped = src.model_dump(by_alias=True, exclude_none=True) - assert dumped["format"] == PROTOBUF_SOURCE["format"] - def test_protobuf_inline_requires_text(self, register_ee_types): - bad = {**PROTOBUF_SOURCE} - bad["format"] = {"type": "protobuf", "protobuf": {"schema_source": "inline"}} - with pytest.raises(ValueError, match="proto_text is required"): - models.KafkaSource.model_validate(bad) +# --- Source registry: an out-of-tree source type plugs in ------------------- - def test_protobuf_registry_source_requires_schema_registry(self, register_ee_types): - # schema_source 'registry' but no schema_registry on the source -> hook fires. - bad = {**PROTOBUF_SOURCE} - bad["format"] = {"type": "protobuf", "protobuf": {"schema_source": "registry"}} - with pytest.raises(ValueError, match="requires a schema registry"): - models.KafkaSource.model_validate(bad) +class KinesisSource(models.SourceBaseConfig): + type: Literal["kinesis"] = "kinesis" + stream_name: str + region: str -class TestRegisteredSourceType: - """An out-of-tree source type plugs in via the registry.""" +@pytest.fixture +def register_kinesis(): + registry.register_source(KinesisSource) + yield + registry._SOURCE_CLASSES.pop("kinesis", None) + + +class TestRegisteredSourceType: def _config(self, source: dict) -> dict: return { "pipeline_id": "p1", @@ -187,7 +243,7 @@ def _config(self, source: dict) -> dict: }, } - def test_kinesis_source_dispatches_and_roundtrips(self, register_ee_types): + def test_kinesis_dispatches_and_roundtrips(self, register_kinesis): kinesis = { "type": "kinesis", "source_id": "k1", @@ -195,12 +251,9 @@ def test_kinesis_source_dispatches_and_roundtrips(self, register_ee_types): "region": "eu-central-1", } cfg = models.PipelineConfig.model_validate(self._config(kinesis)) - assert isinstance(cfg.sources[0], KinesisSource) - # SerializeAsAny keeps kinesis-only fields through the base-typed field. dumped = cfg.model_dump(by_alias=True, exclude_none=True) assert dumped["sources"][0]["stream_name"] == "events" - assert dumped["sources"][0]["region"] == "eu-central-1" def test_unknown_source_type_raises(self): with pytest.raises(ValueError, match="Unknown source type 'pubsub'"): From bd998894de6f4f3bd5bbb93143040c0d2954e4cc Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 16 Jun 2026 14:00:04 +0000 Subject: [PATCH 10/16] docs: update coverage badge --- coverage.xml | 303 +++++++++++++++++++++++++++++---------------------- 1 file changed, 172 insertions(+), 131 deletions(-) diff --git a/coverage.xml b/coverage.xml index c8c1e81..5ba744f 100644 --- a/coverage.xml +++ b/coverage.xml @@ -1,5 +1,5 @@ - + @@ -116,7 +116,7 @@ - + @@ -128,7 +128,7 @@ - + @@ -162,41 +162,43 @@ - + - + + - - - + + + + - - - - - - + + + + - - - - - + + + - - - - + + + + + + + + @@ -308,32 +310,33 @@ - + - + - + - + - + - + + - + @@ -483,11 +486,15 @@ - - + + + + + + @@ -523,7 +530,7 @@ - + @@ -536,8 +543,8 @@ - - + + @@ -832,39 +839,33 @@ - + + - + - - - + + - + - + + + + + - - - - - - - - - - @@ -1048,7 +1049,7 @@ - + @@ -1058,142 +1059,182 @@ - - - - - - - - + + + + + + + - - - - - - - - - - - - - - - - - + - - - - - + - - + + - - - + + + - - + + - - + + + + - - - - - - - - - - - - - - - - - - - + + + - - + + + + - + + - - + + - - - - - + + - - + + + - - + + + + + + - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + From f96cd3913747b0b4f2625efebaf7b45851543278 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 16 Jun 2026 14:02:31 +0000 Subject: [PATCH 11/16] chore: bump version to 4.1.0 --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index fcdb2e1..ee74734 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.0.0 +4.1.0 From 74571aa1d5b182259332f882a4ef81acf3e4c6bf Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 16 Jun 2026 14:03:01 +0000 Subject: [PATCH 12/16] docs: update coverage badge --- coverage.xml | 70 ++++++++++++++++++++++++++-------------------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/coverage.xml b/coverage.xml index 5ba744f..c128bfc 100644 --- a/coverage.xml +++ b/coverage.xml @@ -1,5 +1,5 @@ - + @@ -7,9 +7,9 @@ /home/runner/work/glassflow-python-sdk/glassflow-python-sdk/src/glassflow - + - + @@ -19,7 +19,7 @@ - + @@ -29,7 +29,7 @@ - + @@ -98,7 +98,7 @@ - + @@ -116,9 +116,9 @@ - + - + @@ -128,7 +128,7 @@ - + @@ -201,7 +201,7 @@ - + @@ -253,7 +253,7 @@ - + @@ -299,7 +299,7 @@ - + @@ -336,7 +336,7 @@ - + @@ -497,7 +497,7 @@ - + @@ -530,9 +530,9 @@ - + - + @@ -547,7 +547,7 @@ - + @@ -562,7 +562,7 @@ - + @@ -601,7 +601,7 @@ - + @@ -687,7 +687,7 @@ - + @@ -696,7 +696,7 @@ - + @@ -839,7 +839,7 @@ - + @@ -868,7 +868,7 @@ - + @@ -946,7 +946,7 @@ - + @@ -1028,7 +1028,7 @@ - + @@ -1049,9 +1049,9 @@ - + - + @@ -1069,7 +1069,7 @@ - + @@ -1237,7 +1237,7 @@ - + @@ -1255,9 +1255,9 @@ - + - + @@ -1287,7 +1287,7 @@ - + @@ -1310,7 +1310,7 @@ - + @@ -1318,7 +1318,7 @@ - + @@ -1374,7 +1374,7 @@ - + From 69359ab3d63bc026040e4e3ad3699a956bd2b6b9 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Tue, 16 Jun 2026 16:14:57 +0200 Subject: [PATCH 13/16] feat(ee): add Pipeline.get_streams for NATS stream names (ETL-1188) Add ee.Pipeline.get_streams(), wrapping GET /api/v1/pipeline/{id}/streams. It returns the pipeline's NATS JetStream streams as [{stream_name, component}], useful for diagnosing NATS-level issues. 404 maps to PipelineNotFoundError and 403 to FeatureNotLicensedError. Enterprise-only, so it lives on ee.Pipeline. Verified live against staging pipelines. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/glassflow/ee/pipeline.py | 40 ++++++++++++++++++++++++-- tests/test_ee.py | 55 ++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/src/glassflow/ee/pipeline.py b/src/glassflow/ee/pipeline.py index 7dd57fd..f1a936f 100644 --- a/src/glassflow/ee/pipeline.py +++ b/src/glassflow/ee/pipeline.py @@ -1,5 +1,8 @@ from __future__ import annotations +from typing import Any, Dict, List + +from glassflow.etl import errors from glassflow.etl.pipeline import Pipeline as _OSSPipeline from .dlq import DLQ @@ -10,8 +13,9 @@ class Pipeline(_OSSPipeline): Extends the open-source :class:`glassflow.etl.pipeline.Pipeline`. Its ``dlq`` property exposes the Enterprise :class:`~.dlq.DLQ` (with - ``list``/``reprocess``/``discard``). Construction is inherited unchanged; - only the DLQ collaborator class is swapped via ``_dlq_class``. + ``list``/``reprocess``/``discard``), and it adds :meth:`get_streams`. + Construction is inherited unchanged; only the DLQ collaborator class is + swapped via ``_dlq_class``. """ _dlq_class = DLQ @@ -24,3 +28,35 @@ def dlq(self) -> DLQ: @dlq.setter def dlq(self, dlq: DLQ) -> None: self._dlq = dlq + + def get_streams(self) -> List[Dict[str, Any]]: + """Return the NATS JetStream streams backing this pipeline. + + Each entry has a ``stream_name`` and the ``component`` the stream belongs + to (for example ``ingestor``, ``join``, ``sink``, ``dedup``, ``dlq``). + Useful for diagnosing NATS-level issues. + + Returns: + List of ``{"stream_name": ..., "component": ...}`` dicts. + + Raises: + PipelineNotFoundError: If the pipeline does not exist. + FeatureNotLicensedError: If the backend is not licensed for this. + APIError: If the API request fails. + """ + try: + response = self._request( + "GET", + f"{self.ENDPOINT}/{self.pipeline_id}/streams", + event_name="PipelineStreamsGet", + ) + except errors.ForbiddenError as e: + raise errors.FeatureNotLicensedError( + status_code=e.status_code, + message="Pipeline streams require a GlassFlow Enterprise license", + response=e.response, + details=e.details, + ) from e + if response.status_code == 204 or not response.content: + return [] + return response.json().get("streams", []) diff --git a/tests/test_ee.py b/tests/test_ee.py index 53e6c5d..48446f0 100644 --- a/tests/test_ee.py +++ b/tests/test_ee.py @@ -3,11 +3,15 @@ DLQ-specific Enterprise capabilities are covered in a follow-up PR. """ +from unittest.mock import patch + import pytest from glassflow import ee +from glassflow.etl import errors from glassflow.etl.client import Client as OSSClient from glassflow.etl.pipeline import Pipeline as OSSPipeline +from tests.data import mock_responses @pytest.fixture @@ -43,3 +47,54 @@ def test_get_pipeline_returns_ee_pipeline( pipeline = client.get_pipeline("test-pipeline-id") assert isinstance(pipeline, ee.Pipeline) + + +class TestGetStreams: + @pytest.fixture + def ee_pipeline_by_id(self): + return ee.Pipeline(host="http://localhost:8080", pipeline_id="p1") + + def test_get_streams_success(self, ee_pipeline_by_id, mock_success, mock_track): + payload = { + "pipeline_id": "p1", + "streams": [ + {"stream_name": "gfm-abc-DLQ", "component": "dlq"}, + {"stream_name": "gfm-abc-ingestor", "component": "ingestor"}, + ], + } + with mock_success(json_payloads=[payload]) as mock_get: + streams = ee_pipeline_by_id.get_streams() + + mock_get.assert_called_once_with("GET", "/api/v1/pipeline/p1/streams") + assert streams == payload["streams"] + assert streams[0]["component"] == "dlq" + + def test_get_streams_empty_on_204(self, ee_pipeline_by_id, mock_track): + resp = mock_responses.create_mock_response_factory()( + status_code=204, json_data=None + ) + with patch("httpx.Client.request", return_value=resp): + assert ee_pipeline_by_id.get_streams() == [] + + def test_get_streams_not_found(self, ee_pipeline_by_id, mock_track): + resp = mock_responses.create_mock_response_factory()( + status_code=404, json_data={"message": "not found"} + ) + with patch( + "httpx.Client.request", side_effect=resp.raise_for_status.side_effect + ): + with pytest.raises(errors.PipelineNotFoundError): + ee_pipeline_by_id.get_streams() + + def test_get_streams_forbidden_maps_to_feature_not_licensed( + self, ee_pipeline_by_id, mock_track + ): + resp = mock_responses.create_mock_response_factory()( + status_code=403, json_data={"message": "Forbidden"} + ) + with patch( + "httpx.Client.request", side_effect=resp.raise_for_status.side_effect + ): + with pytest.raises(errors.FeatureNotLicensedError) as exc: + ee_pipeline_by_id.get_streams() + assert isinstance(exc.value, errors.ForbiddenError) From 9c25d52633c99620d2a0c07ec971a260b7c0edbb Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 16 Jun 2026 14:17:39 +0000 Subject: [PATCH 14/16] docs: update coverage badge --- coverage.xml | 90 +++++++++++++++++++++++++++++----------------------- 1 file changed, 50 insertions(+), 40 deletions(-) diff --git a/coverage.xml b/coverage.xml index c128bfc..59a16dc 100644 --- a/coverage.xml +++ b/coverage.xml @@ -1,5 +1,5 @@ - + @@ -7,9 +7,9 @@ /home/runner/work/glassflow-python-sdk/glassflow-python-sdk/src/glassflow - + - + @@ -19,7 +19,7 @@ - + @@ -29,7 +29,7 @@ - + @@ -98,27 +98,37 @@ - + + - - - - + + + - + + + + + + + + + + + - + - + @@ -128,7 +138,7 @@ - + @@ -201,7 +211,7 @@ - + @@ -253,7 +263,7 @@ - + @@ -299,7 +309,7 @@ - + @@ -336,7 +346,7 @@ - + @@ -497,7 +507,7 @@ - + @@ -530,9 +540,9 @@ - + - + @@ -547,7 +557,7 @@ - + @@ -562,7 +572,7 @@ - + @@ -601,7 +611,7 @@ - + @@ -687,7 +697,7 @@ - + @@ -696,7 +706,7 @@ - + @@ -839,7 +849,7 @@ - + @@ -868,7 +878,7 @@ - + @@ -946,7 +956,7 @@ - + @@ -1028,7 +1038,7 @@ - + @@ -1049,9 +1059,9 @@ - + - + @@ -1069,7 +1079,7 @@ - + @@ -1237,7 +1247,7 @@ - + @@ -1255,9 +1265,9 @@ - + - + @@ -1287,7 +1297,7 @@ - + @@ -1310,7 +1320,7 @@ - + @@ -1318,7 +1328,7 @@ - + @@ -1374,7 +1384,7 @@ - + From 4521d548a1577a9d3f16b38bc1f38e8d31ec960a Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Tue, 16 Jun 2026 16:50:52 +0200 Subject: [PATCH 15/16] refactor(models): unify Kafka avro/protobuf schema under schema.file Update the Kafka source schema to the latest backend contract: - The Avro `.avsc` and Protobuf `.proto` schemas are unified into a single inline-string `schema.file`; the protobuf message name is `schema.message_type` (was `message`). The structural AvroSchema model is removed (the avsc is now an opaque string validated by the backend). - json fields now serialize to `schema.fields` (the unified object); the legacy top-level `schema_fields` is still accepted on input and upgraded to `schema.fields`. This removes the previous split serializer. - `schema.parsed_fields` remains read-only: parsed on GET, exposed for inspection, never sent back on create/edit. Validation: avro requires `schema.file`; protobuf requires `schema.file` and `schema.message_type`; using either with a non-avro/protobuf format is rejected. --- src/glassflow/etl/models/__init__.py | 2 - src/glassflow/etl/models/sources/__init__.py | 2 - src/glassflow/etl/models/sources/kafka.py | 116 +++++----------- tests/test_models/test_source_formats.py | 134 ++++++------------- 4 files changed, 72 insertions(+), 182 deletions(-) diff --git a/src/glassflow/etl/models/__init__.py b/src/glassflow/etl/models/__init__.py index 194a583..edda7e0 100644 --- a/src/glassflow/etl/models/__init__.py +++ b/src/glassflow/etl/models/__init__.py @@ -27,7 +27,6 @@ from .source import SourceBaseConfig, SourceBaseConfigPatch, SourceType from .sources import ( AnySource, - AvroSchema, ConsumerGroupOffset, KafkaConnectionParams, KafkaConnectionParamsPatch, @@ -65,7 +64,6 @@ __all__ = [ "AnySource", - "AvroSchema", "ClickhouseConnectionParams", "ClickhouseConnectionParamsPatch", "ClickhouseDataType", diff --git a/src/glassflow/etl/models/sources/__init__.py b/src/glassflow/etl/models/sources/__init__.py index 8b62a29..bb93ca8 100644 --- a/src/glassflow/etl/models/sources/__init__.py +++ b/src/glassflow/etl/models/sources/__init__.py @@ -17,7 +17,6 @@ from ..registry import register_source from ..source import SourceBaseConfig, SourceBaseConfigPatch, SourceType from .kafka import ( - AvroSchema, ConsumerGroupOffset, KafkaConnectionParams, KafkaConnectionParamsPatch, @@ -68,7 +67,6 @@ "SourceBaseConfig", "SourceBaseConfigPatch", # Kafka - "AvroSchema", "ConsumerGroupOffset", "KafkaConnectionParams", "KafkaConnectionParamsPatch", diff --git a/src/glassflow/etl/models/sources/kafka.py b/src/glassflow/etl/models/sources/kafka.py index 04ef287..092b269 100644 --- a/src/glassflow/etl/models/sources/kafka.py +++ b/src/glassflow/etl/models/sources/kafka.py @@ -1,14 +1,8 @@ """Kafka source models.""" -from typing import Any, Dict, List, Literal, Optional +from typing import Any, List, Literal, Optional -from pydantic import ( - BaseModel, - ConfigDict, - Field, - model_serializer, - model_validator, -) +from pydantic import BaseModel, ConfigDict, Field, model_validator from ..base import CaseInsensitiveStrEnum from ..data_types import KafkaDataType @@ -56,42 +50,24 @@ class KafkaField(BaseModel): type: KafkaDataType -class AvroSchema(BaseModel): - """The Avro schema record (``schema.avsc``) for an Avro Kafka source. - - GlassFlow maps the record's root-level fields to ClickHouse columns, so the - top-level schema must be an Avro ``record`` with a ``name`` and a non-empty - ``fields`` list. Individual field ``type`` values may themselves be nested - Avro schemas. Other Avro keys (``namespace``, ``doc``, ``aliases``, ...) are - accepted and preserved on round-trip. - """ - - model_config = ConfigDict(extra="allow") - - type: Literal["record"] - name: str - fields: List[Dict[str, Any]] = Field(min_length=1) - namespace: Optional[str] = Field(default=None) - - class KafkaSchema(BaseModel): """Unified schema for a Kafka source. The shape used is selected by the source's :class:`KafkaFormat`: - ``json`` -> ``fields`` (GlassFlow field declarations) - - ``avro`` -> ``avsc`` (the Avro schema record) - - ``protobuf`` -> ``proto`` (the ``.proto`` text) and ``message`` (the - message name within it) + - ``avro`` -> ``file`` (the inline ``.avsc`` schema text) + - ``protobuf`` -> ``file`` (the inline ``.proto`` text) and ``message_type`` + (the message within it to decode) On reads the backend also returns ``parsed_fields``: the field list parsed - from the avsc/proto. It is read-only and is not sent back on create/edit. + from the avsc/proto. It is read-only, exposed for inspection, and never sent + back on create/edit. """ fields: Optional[List[KafkaField]] = Field(default=None) - avsc: Optional[AvroSchema] = Field(default=None) - proto: Optional[str] = Field(default=None) - message: Optional[str] = Field(default=None) - parsed_fields: Optional[List[KafkaField]] = Field(default=None) + file: Optional[str] = Field(default=None) + message_type: Optional[str] = Field(default=None) + parsed_fields: Optional[List[KafkaField]] = Field(default=None, exclude=True) class KafkaConnectionParams(BaseModel): @@ -123,32 +99,22 @@ def update(self, patch: "KafkaConnectionParamsPatch") -> "KafkaConnectionParams" def _parse_source_schema(data: Any) -> Any: - """Fold both wire schema shapes into the unified ``source_schema``: - - - top-level ``schema_fields`` (Open Source, and Enterprise for compatibility) - - the unified ``schema`` object (Enterprise: avsc / proto+message / - parsed_fields, and optionally fields) - - Also drops an empty ``schema_registry``. Left untouched if ``source_schema`` - is already supplied directly. + """Accept the legacy top-level ``schema_fields`` (deprecated) by folding it + into the unified ``schema`` object's ``fields``. The ``schema`` object itself + maps directly to ``source_schema`` via its alias. Also drops an empty + ``schema_registry``. Left untouched if ``source_schema`` is supplied by name. """ if not isinstance(data, dict) or "source_schema" in data: return data data = dict(data) if data.get("schema_registry", None) == {}: data.pop("schema_registry", None) - - schema: Dict[str, Any] = {} if "schema_fields" in data: - schema["fields"] = data.pop("schema_fields") - if "schema" in data: - wire = data.pop("schema") - if isinstance(wire, dict): - schema.update(wire) - else: - data["schema"] = wire # let validation reject a non-object schema - if schema: - data["source_schema"] = schema + fields = data.pop("schema_fields") + schema = data.get("schema") + schema = dict(schema) if isinstance(schema, dict) else {} + schema.setdefault("fields", fields) + data["schema"] = schema return data @@ -171,35 +137,17 @@ class KafkaSource(SourceBaseConfig): # omitted from the serialized config. ``avro`` and ``protobuf`` are # Enterprise features and are rejected by an unlicensed backend. format: Optional[KafkaFormat] = Field(default=None) - # All schema-related config in one place. Serialized back to the wire as - # top-level ``schema_fields`` (json, for Open Source / Enterprise compat) or - # the ``schema`` object (avro/protobuf); see the serializer below. - source_schema: Optional[KafkaSchema] = Field(default=None) + # All schema-related config in one place, serialized as the ``schema`` + # object (``fields`` for json, ``file`` [+ ``message_type``] for + # avro/protobuf). The legacy top-level ``schema_fields`` is still accepted on + # input (see _parse_source_schema). + source_schema: Optional[KafkaSchema] = Field(default=None, alias="schema") @model_validator(mode="before") @classmethod def parse_schema(cls, data: Any) -> Any: return _parse_source_schema(data) - @model_serializer(mode="wrap") - def serialize_schema(self, handler: Any) -> Any: - """Emit json fields as top-level ``schema_fields`` (compatible with both - editions) and avro/protobuf as the ``schema`` object. ``parsed_fields`` - is read-only and is not emitted.""" - data = handler(self) - schema = data.pop("source_schema", None) - if schema: - if schema.get("fields") is not None: - data["schema_fields"] = schema["fields"] - inner = { - k: schema[k] - for k in ("avsc", "proto", "message") - if schema.get(k) is not None - } - if inner: - data["schema"] = inner - return data - @property def schema_fields(self) -> Optional[List[KafkaField]]: """Backward-compatible accessor for the JSON field declarations, held @@ -221,18 +169,16 @@ def validate_format_schema(self) -> "KafkaSource": """The schema shape must match the declared format.""" schema = self.source_schema if self.format == KafkaFormat.AVRO: - if schema is None or schema.avsc is None: - raise ValueError("avro format requires schema.avsc") + if schema is None or not schema.file: + raise ValueError("avro format requires schema.file") elif self.format == KafkaFormat.PROTOBUF: - if schema is None or not schema.proto or not schema.message: + if schema is None or not schema.file or not schema.message_type: raise ValueError( - "protobuf format requires schema.proto and schema.message" + "protobuf format requires schema.file and schema.message_type" ) - elif schema is not None and ( - schema.avsc is not None or schema.proto or schema.message - ): + elif schema is not None and (schema.file or schema.message_type): raise ValueError( - "schema.avsc / proto / message require format 'avro' or 'protobuf'" + "schema.file / message_type require format 'avro' or 'protobuf'" ) return self @@ -282,7 +228,7 @@ class KafkaSourcePatch(SourceBaseConfigPatch): connection_params: Optional[KafkaConnectionParamsPatch] = Field(default=None) topic: Optional[str] = Field(default=None) format: Optional[KafkaFormat] = Field(default=None) - source_schema: Optional[KafkaSchema] = Field(default=None) + source_schema: Optional[KafkaSchema] = Field(default=None, alias="schema") @model_validator(mode="before") @classmethod diff --git a/tests/test_models/test_source_formats.py b/tests/test_models/test_source_formats.py index 6a70818..2306e10 100644 --- a/tests/test_models/test_source_formats.py +++ b/tests/test_models/test_source_formats.py @@ -1,9 +1,11 @@ """Tests for Kafka source formats (json/avro/protobuf) and the source registry. -The unified ``source_schema`` holds all schema config. It serializes json to a -top-level ``schema_fields`` (compatible with Open Source and Enterprise) and -avro/protobuf to the ``schema`` object (``avsc`` / ``proto``+``message``). On -reads the backend may also return ``schema.parsed_fields``. +The unified ``source_schema`` (wire key ``schema``) holds all schema config: +``fields`` (json), ``file`` (the inline avsc/proto text), ``message_type`` +(protobuf), and read-only ``parsed_fields`` (returned on GET). The legacy +top-level ``schema_fields`` is still accepted on input and upgraded to +``schema.fields``. The source registry lets an out-of-tree source type plug in +without changing OSS models. """ from typing import Literal @@ -28,15 +30,9 @@ def _kafka(**overrides) -> dict: return base -AVSC = { - "type": "record", - "name": "Event", - "namespace": "test", - "fields": [ - {"name": "id", "type": "string"}, - {"name": "ts_ms", "type": "long"}, - ], -} +AVSC_TEXT = ( + '{"type": "record", "name": "Event", "fields": [{"name": "id", "type": "string"}]}' +) PROTO_TEXT = 'syntax = "proto3";\npackage test;\nmessage Event {\n string id = 1;\n}' @@ -46,104 +42,71 @@ def test_format_defaults_to_none_and_is_omitted(self): assert src.format is None assert "format" not in src.model_dump(by_alias=True, exclude_none=True) - def test_top_level_schema_fields_round_trips(self): - wire = _kafka( - format="json", - schema_fields=[ - {"name": "id", "type": "string"}, - {"name": "ts_ms", "type": "int"}, - ], - ) + def test_legacy_schema_fields_accepted_and_upgraded(self): + # The deprecated top-level schema_fields is folded into schema.fields. + wire = _kafka(format="json", schema_fields=[{"name": "id", "type": "string"}]) src = models.KafkaSource.model_validate(wire) assert src.source_schema.fields[0].name == "id" assert src.schema_fields[0].name == "id" # compat accessor dumped = src.model_dump(by_alias=True, exclude_none=True) - # json always serializes to top-level schema_fields (OSS + EE compat) - assert dumped["schema_fields"] == wire["schema_fields"] - assert "schema" not in dumped - assert "source_schema" not in dumped + assert dumped["schema"] == {"fields": [{"name": "id", "type": "string"}]} + assert "schema_fields" not in dumped # upgraded to the unified schema - def test_unified_schema_fields_input_also_accepted(self): - # EE may return json fields under the unified schema object. - src = models.KafkaSource.model_validate( - _kafka(format="json", schema={"fields": [{"name": "id", "type": "string"}]}) + def test_schema_fields_round_trips(self): + wire = _kafka( + format="json", schema={"fields": [{"name": "id", "type": "string"}]} ) + src = models.KafkaSource.model_validate(wire) assert src.schema_fields[0].name == "id" - # still emits the compatible top-level form dumped = src.model_dump(by_alias=True, exclude_none=True) - assert dumped["schema_fields"] == [{"name": "id", "type": "string"}] + assert dumped["schema"] == {"fields": [{"name": "id", "type": "string"}]} class TestAvroFormat: def test_avro_round_trips(self): src = models.KafkaSource.model_validate( - _kafka(format="avro", schema={"avsc": AVSC}) + _kafka(format="avro", schema={"file": AVSC_TEXT}) ) assert src.format == models.KafkaFormat.AVRO - assert src.source_schema.avsc.name == "Event" + assert src.source_schema.file == AVSC_TEXT dumped = src.model_dump(by_alias=True, exclude_none=True) - assert dumped["schema"] == {"avsc": AVSC} - assert "schema_fields" not in dumped + assert dumped["schema"] == {"file": AVSC_TEXT} - def test_avro_requires_avsc(self): - with pytest.raises(ValueError, match="avro format requires schema.avsc"): + def test_avro_requires_file(self): + with pytest.raises(ValueError, match="avro format requires schema.file"): models.KafkaSource.model_validate(_kafka(format="avro")) - def test_avsc_must_be_a_record_with_fields(self): - with pytest.raises(ValueError): - models.KafkaSource.model_validate( - _kafka(format="avro", schema={"avsc": {"type": "string", "name": "x"}}) - ) - with pytest.raises(ValueError): - models.KafkaSource.model_validate( - _kafka(format="avro", schema={"avsc": {"type": "record", "name": "E"}}) - ) - - def test_avsc_preserves_extra_keys(self): - avsc = { - "type": "record", - "name": "Event", - "namespace": "test", - "doc": "an event", - "fields": [{"name": "meta", "type": {"type": "map", "values": "string"}}], - } - src = models.KafkaSource.model_validate( - _kafka(format="avro", schema={"avsc": avsc}) - ) - dumped = src.model_dump(by_alias=True, exclude_none=True) - assert dumped["schema"]["avsc"] == avsc - class TestProtobufFormat: def test_protobuf_round_trips(self): src = models.KafkaSource.model_validate( - _kafka(format="protobuf", schema={"proto": PROTO_TEXT, "message": "Event"}) + _kafka( + format="protobuf", + schema={"file": PROTO_TEXT, "message_type": "Event"}, + ) ) assert src.format == models.KafkaFormat.PROTOBUF - assert src.source_schema.proto == PROTO_TEXT - assert src.source_schema.message == "Event" + assert src.source_schema.file == PROTO_TEXT + assert src.source_schema.message_type == "Event" dumped = src.model_dump(by_alias=True, exclude_none=True) - assert dumped["schema"] == {"proto": PROTO_TEXT, "message": "Event"} + assert dumped["schema"] == {"file": PROTO_TEXT, "message_type": "Event"} - def test_protobuf_requires_proto_and_message(self): - with pytest.raises(ValueError, match="schema.proto and schema.message"): + def test_protobuf_requires_file_and_message_type(self): + with pytest.raises(ValueError, match="schema.file and schema.message_type"): models.KafkaSource.model_validate( - _kafka(format="protobuf", schema={"proto": PROTO_TEXT}) + _kafka(format="protobuf", schema={"file": PROTO_TEXT}) ) class TestParsedFields: - def test_parsed_fields_read_only_not_surfaced_or_emitted(self): - # Shape the backend returns for an avro source on GET. + def test_parsed_fields_read_only(self): + # Shape the backend returns on GET for an avro source. src = models.KafkaSource.model_validate( _kafka( format="avro", schema={ - "avsc": AVSC, - "parsed_fields": [ - {"name": "id", "type": "string"}, - {"name": "ts_ms", "type": "int"}, - ], + "file": AVSC_TEXT, + "parsed_fields": [{"name": "id", "type": "string"}], }, ) ) @@ -153,30 +116,15 @@ def test_parsed_fields_read_only_not_surfaced_or_emitted(self): assert src.schema_fields is None # ...and not emitted back on dump. dumped = src.model_dump(by_alias=True, exclude_none=True) - assert dumped["schema"] == {"avsc": AVSC} + assert dumped["schema"] == {"file": AVSC_TEXT} assert "parsed_fields" not in dumped["schema"] - def test_parsed_fields_on_json_get(self): - # GET of a json source: schema_fields (or schema.fields) plus parsed_fields. - src = models.KafkaSource.model_validate( - _kafka( - format="json", - schema_fields=[{"name": "id", "type": "string"}], - schema={"parsed_fields": [{"name": "id", "type": "string"}]}, - ) - ) - assert src.schema_fields[0].name == "id" - assert src.source_schema.parsed_fields[0].name == "id" - dumped = src.model_dump(by_alias=True, exclude_none=True) - assert dumped["schema_fields"] == [{"name": "id", "type": "string"}] - assert "schema" not in dumped # parsed_fields not echoed back - class TestSchemaFormatConsistency: - def test_avsc_without_avro_format_rejected(self): + def test_file_without_avro_or_protobuf_format_rejected(self): with pytest.raises(ValueError, match="require format 'avro' or 'protobuf'"): models.KafkaSource.model_validate( - _kafka(format="json", schema={"avsc": AVSC}) + _kafka(format="json", schema={"file": AVSC_TEXT}) ) From 0da244d0f1ea9b3977409cfc90852f076642a43e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 16 Jun 2026 14:52:21 +0000 Subject: [PATCH 16/16] docs: update coverage badge --- coverage.xml | 258 +++++++++++++++++++++++---------------------------- 1 file changed, 118 insertions(+), 140 deletions(-) diff --git a/coverage.xml b/coverage.xml index 59a16dc..afee7dd 100644 --- a/coverage.xml +++ b/coverage.xml @@ -1,5 +1,5 @@ - + @@ -7,9 +7,9 @@ /home/runner/work/glassflow-python-sdk/glassflow-python-sdk/src/glassflow - + - + @@ -19,7 +19,7 @@ - + @@ -29,7 +29,7 @@ - + @@ -98,7 +98,7 @@ - + @@ -126,9 +126,9 @@ - + - + @@ -138,7 +138,7 @@ - + @@ -211,7 +211,7 @@ - + @@ -263,7 +263,7 @@ - + @@ -309,7 +309,7 @@ - + @@ -346,7 +346,7 @@ - + @@ -507,7 +507,7 @@ - + @@ -540,9 +540,9 @@ - + - + @@ -553,11 +553,11 @@ - - + + - + @@ -572,7 +572,7 @@ - + @@ -611,7 +611,7 @@ - + @@ -697,7 +697,7 @@ - + @@ -706,7 +706,7 @@ - + @@ -849,7 +849,7 @@ - + @@ -878,7 +878,7 @@ - + @@ -956,7 +956,7 @@ - + @@ -1038,7 +1038,7 @@ - + @@ -1059,9 +1059,9 @@ - + - + @@ -1069,21 +1069,25 @@ - - - + + + + - - - - + + + - + + + + + @@ -1091,163 +1095,137 @@ - + - - + + - - - + + + - - - + - - - - + + + + - - + + + - + + + + + + + + + + + + - - + + - - - - - - - - + + - + - + + - - + - - + + + + + + - - - - - - - + - - - + + - + + + + - - - + + + - + + + - - - + + - - - - + + + - - + + + + + - - - - + + + - + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + @@ -1265,9 +1243,9 @@ - + - + @@ -1297,7 +1275,7 @@ - + @@ -1320,7 +1298,7 @@ - + @@ -1328,7 +1306,7 @@ - + @@ -1384,7 +1362,7 @@ - +