diff --git a/src/glassflow/etl/api_client.py b/src/glassflow/etl/api_client.py index 701a874..362dfcc 100644 --- a/src/glassflow/etl/api_client.py +++ b/src/glassflow/etl/api_client.py @@ -67,66 +67,81 @@ def _raise_api_error(response: httpx.Response) -> None: error_data = response.json() message = error_data.get("message", None) code = error_data.get("code", None) + details = error_data.get("details", None) except json.JSONDecodeError: message = f"{status_code} {response.reason_phrase}" code = None + details = None error_data = {} if status_code == 400: # Handle specific status validation error codes if code == "TERMINAL_STATE_VIOLATION": raise errors.TerminalStateViolationError( - status_code, message, response=response + status_code, message, response=response, details=details ) elif code == "INVALID_STATUS_TRANSITION": raise errors.InvalidStatusTransitionError( - status_code, - message, - response=response, + status_code, message, response=response, details=details ) elif code == "UNKNOWN_STATUS": - raise errors.UnknownStatusError(status_code, message, response=response) + raise errors.UnknownStatusError( + status_code, message, response=response, details=details + ) elif code == "PIPELINE_ALREADY_IN_STATE": raise errors.PipelineAlreadyInStateError( - status_code, message, response=response + status_code, message, response=response, details=details ) elif code == "PIPELINE_IN_TRANSITION": raise errors.PipelineInTransitionError( - status_code, message, response=response + status_code, message, response=response, details=details ) elif message and message.startswith("invalid json:"): - raise errors.InvalidJsonError(status_code, message, response=response) + raise errors.InvalidJsonError( + status_code, message, response=response, details=details + ) elif message and message == "pipeline id cannot be empty": raise errors.EmptyPipelineIdError( - status_code, message, response=response + status_code, message, response=response, details=details ) elif message and message.startswith( "pipeline can only be deleted if it's stopped or terminated" ): raise errors.PipelineDeletionStateViolationError( - status_code, message, response=response + status_code, message, response=response, details=details ) else: # Generic 400 error for unknown codes - raise errors.ValidationError(status_code, message, response=response) + raise errors.ValidationError( + status_code, message, response=response, details=details + ) elif status_code == 403: - raise errors.ForbiddenError(status_code, message, response=response) + raise errors.ForbiddenError( + status_code, message, response=response, details=details + ) elif status_code == 404: - raise errors.NotFoundError(status_code, message, response=response) + raise errors.NotFoundError( + status_code, message, response=response, details=details + ) elif status_code == 409: - raise errors.ConflictError(status_code, message, response=response) + raise errors.ConflictError( + status_code, message, response=response, details=details + ) elif status_code == 422: raise errors.UnprocessableContentError( - status_code, message, response=response + status_code, message, response=response, details=details ) elif status_code == 500: - raise errors.ServerError(status_code, message, response=response) + raise errors.ServerError( + status_code, message, response=response, details=details + ) else: raise errors.APIError( status_code, message="An error occurred: " f"({status_code} {response.reason_phrase}) {message}", response=response, + details=details, ) def _track_event(self, event_name: str, **kwargs: Any) -> None: diff --git a/src/glassflow/etl/errors.py b/src/glassflow/etl/errors.py index b903440..b92f015 100644 --- a/src/glassflow/etl/errors.py +++ b/src/glassflow/etl/errors.py @@ -15,10 +15,14 @@ class ConnectionError(RequestError): class APIError(GlassFlowError): """Base for API response errors.""" - def __init__(self, status_code, message=None, response=None): + def __init__(self, status_code, message=None, response=None, details=None): self.status_code = status_code self.response = response self.message = message + # The API's structured ``details`` object, when present. For invalid + # configs it carries the specific cause under ``details["error"]`` (for + # example an Avro/Protobuf schema compilation error). + self.details = details or {} super().__init__(self.message) diff --git a/src/glassflow/etl/models/__init__.py b/src/glassflow/etl/models/__init__.py index 54df304..194a583 100644 --- a/src/glassflow/etl/models/__init__.py +++ b/src/glassflow/etl/models/__init__.py @@ -27,13 +27,15 @@ from .source import SourceBaseConfig, SourceBaseConfigPatch, SourceType from .sources import ( AnySource, + AvroSchema, ConsumerGroupOffset, - JsonFormat, KafkaConnectionParams, KafkaConnectionParamsPatch, KafkaField, + KafkaFormat, KafkaMechanism, KafkaProtocol, + KafkaSchema, KafkaSource, KafkaSourcePatch, OTLPLogsSource, @@ -42,7 +44,6 @@ OTLPTracesSource, SchemaRegistry, SourceConfig, - SourceFormat, ) from .transforms import ( DedupTransform, @@ -64,6 +65,7 @@ __all__ = [ "AnySource", + "AvroSchema", "ClickhouseConnectionParams", "ClickhouseConnectionParamsPatch", "ClickhouseDataType", @@ -77,15 +79,16 @@ "JoinConfig", "JoinConfigPatch", "JoinOutputField", - "JsonFormat", "JoinSourceConfig", "JoinType", "KafkaConnectionParams", "KafkaConnectionParamsPatch", "KafkaDataType", "KafkaField", + "KafkaFormat", "KafkaMechanism", "KafkaProtocol", + "KafkaSchema", "KafkaSource", "KafkaSourcePatch", "MetadataConfig", @@ -109,7 +112,6 @@ "SourceBaseConfig", "SourceBaseConfigPatch", "SourceConfig", - "SourceFormat", "SourceResourceEntry", "SourceType", "StatelessTransform", diff --git a/src/glassflow/etl/models/registry.py b/src/glassflow/etl/models/registry.py index 14b436c..7ed65da 100644 --- a/src/glassflow/etl/models/registry.py +++ b/src/glassflow/etl/models/registry.py @@ -1,17 +1,15 @@ -"""Extensible registries for source types and source formats. +"""Extensible registry for source types. -Source types (``kafka``, ``otlp.logs``, ...) and source formats (``json``, -and Enterprise ``avro``/``protobuf``) are open-ended: the Enterprise SDK adds +Source types (``kafka``, ``otlp.logs``, ...) are open-ended: an edition can add new ones without modifying this package. Instead of a static Pydantic discriminated union (whose members are frozen at class-definition time), models -annotate their fields with the base type and dispatch each value to the -concrete class by its ``type`` string at validation time, looked up here. +annotate their field with the base type and dispatch each value to the concrete +class by its ``type`` string at validation time, looked up here. Editions register their classes at import:: - from glassflow.etl.models.registry import register_source, register_format + from glassflow.etl.models.registry import register_source register_source(KinesisSource) - register_format(AvroFormat) """ from __future__ import annotations @@ -23,7 +21,6 @@ _T = TypeVar("_T", bound=BaseModel) _SOURCE_CLASSES: Dict[str, Type[BaseModel]] = {} -_FORMAT_CLASSES: Dict[str, Type[BaseModel]] = {} def _type_key(cls: Type[BaseModel]) -> str: @@ -44,13 +41,6 @@ def register_source(cls: Type[_T]) -> Type[_T]: return cls -def register_format(cls: Type[_T]) -> Type[_T]: - """Register a source format class, keyed by its ``type`` default. Usable as - a decorator.""" - _FORMAT_CLASSES[_type_key(cls)] = cls - return cls - - def _resolve(value: Any, registry: Dict[str, Type[BaseModel]], kind: str) -> Any: """Coerce a raw dict to the registered concrete model by its ``type``. @@ -69,7 +59,3 @@ def _resolve(value: Any, registry: Dict[str, Type[BaseModel]], kind: str) -> Any def resolve_source(value: Any) -> Any: return _resolve(value, _SOURCE_CLASSES, "source") - - -def resolve_format(value: Any) -> Any: - return _resolve(value, _FORMAT_CLASSES, "format") diff --git a/src/glassflow/etl/models/sources/__init__.py b/src/glassflow/etl/models/sources/__init__.py index 6a4b2f3..8b62a29 100644 --- a/src/glassflow/etl/models/sources/__init__.py +++ b/src/glassflow/etl/models/sources/__init__.py @@ -16,14 +16,16 @@ from ..registry import register_source from ..source import SourceBaseConfig, SourceBaseConfigPatch, SourceType -from .formats import JsonFormat, SourceFormat from .kafka import ( + AvroSchema, ConsumerGroupOffset, KafkaConnectionParams, KafkaConnectionParamsPatch, KafkaField, + KafkaFormat, KafkaMechanism, KafkaProtocol, + KafkaSchema, KafkaSource, KafkaSourcePatch, SchemaRegistry, @@ -66,12 +68,15 @@ "SourceBaseConfig", "SourceBaseConfigPatch", # Kafka + "AvroSchema", "ConsumerGroupOffset", "KafkaConnectionParams", "KafkaConnectionParamsPatch", "KafkaField", + "KafkaFormat", "KafkaMechanism", "KafkaProtocol", + "KafkaSchema", "KafkaSource", "KafkaSourcePatch", "SchemaRegistry", @@ -81,9 +86,6 @@ "OTLPSource", "OTLPSourcePatch", "OTLPTracesSource", - # Formats - "SourceFormat", - "JsonFormat", # Union "AnySource", "SourceConfig", diff --git a/src/glassflow/etl/models/sources/formats.py b/src/glassflow/etl/models/sources/formats.py deleted file mode 100644 index aed8c28..0000000 --- a/src/glassflow/etl/models/sources/formats.py +++ /dev/null @@ -1,46 +0,0 @@ -"""Source format models. - -A source's ``format`` describes how its payload is encoded and how the schema -is supplied. It is a tagged object discriminated by ``type`` (``json`` in OSS; -``avro``/``protobuf`` in Enterprise), with format-specific configuration nested -under a key named after the type, e.g.:: - - {"type": "protobuf", "protobuf": {"schema_source": "inline", "proto_text": "..."}} - -New formats are added by subclassing :class:`SourceFormat` and registering them -with :func:`glassflow.etl.models.registry.register_format`; no change to this -package is required. -""" - -from __future__ import annotations - -from typing import Literal - -from pydantic import BaseModel - -from ..registry import register_format - - -class SourceFormat(BaseModel): - """Base class for all source formats.""" - - type: str - - def validate_against_registry(self, has_schema_registry: bool) -> None: - """Hook for formats to enforce schema-source rules that depend on the - parent source's schema registry configuration. - - Called by the source's after-validator with whether the source has a - schema registry configured. The base implementation is a no-op; - formats that can read their schema from a registry (e.g. Enterprise - avro/protobuf) override this to require that the registry is present - when ``schema_source`` is ``"registry"``. - """ - return None - - -@register_format -class JsonFormat(SourceFormat): - """JSON-encoded payloads. This is the default and carries no extra config.""" - - type: Literal["json"] = "json" diff --git a/src/glassflow/etl/models/sources/kafka.py b/src/glassflow/etl/models/sources/kafka.py index fec07aa..04ef287 100644 --- a/src/glassflow/etl/models/sources/kafka.py +++ b/src/glassflow/etl/models/sources/kafka.py @@ -1,14 +1,24 @@ """Kafka source models.""" -from typing import Any, List, Literal, Optional +from typing import Any, Dict, List, Literal, Optional -from pydantic import BaseModel, Field, SerializeAsAny, field_validator, model_validator +from pydantic import ( + BaseModel, + ConfigDict, + Field, + model_serializer, + model_validator, +) from ..base import CaseInsensitiveStrEnum from ..data_types import KafkaDataType -from ..registry import resolve_format from ..source import SourceBaseConfig, SourceBaseConfigPatch, SourceType -from .formats import SourceFormat + + +class KafkaFormat(CaseInsensitiveStrEnum): + JSON = "json" + AVRO = "avro" + PROTOBUF = "protobuf" class KafkaProtocol(CaseInsensitiveStrEnum): @@ -46,6 +56,44 @@ class KafkaField(BaseModel): type: KafkaDataType +class AvroSchema(BaseModel): + """The Avro schema record (``schema.avsc``) for an Avro Kafka source. + + GlassFlow maps the record's root-level fields to ClickHouse columns, so the + top-level schema must be an Avro ``record`` with a ``name`` and a non-empty + ``fields`` list. Individual field ``type`` values may themselves be nested + Avro schemas. Other Avro keys (``namespace``, ``doc``, ``aliases``, ...) are + accepted and preserved on round-trip. + """ + + model_config = ConfigDict(extra="allow") + + type: Literal["record"] + name: str + fields: List[Dict[str, Any]] = Field(min_length=1) + namespace: Optional[str] = Field(default=None) + + +class KafkaSchema(BaseModel): + """Unified schema for a Kafka source. The shape used is selected by the + source's :class:`KafkaFormat`: + + - ``json`` -> ``fields`` (GlassFlow field declarations) + - ``avro`` -> ``avsc`` (the Avro schema record) + - ``protobuf`` -> ``proto`` (the ``.proto`` text) and ``message`` (the + message name within it) + + On reads the backend also returns ``parsed_fields``: the field list parsed + from the avsc/proto. It is read-only and is not sent back on create/edit. + """ + + fields: Optional[List[KafkaField]] = Field(default=None) + avsc: Optional[AvroSchema] = Field(default=None) + proto: Optional[str] = Field(default=None) + message: Optional[str] = Field(default=None) + parsed_fields: Optional[List[KafkaField]] = Field(default=None) + + class KafkaConnectionParams(BaseModel): brokers: List[str] protocol: KafkaProtocol @@ -74,6 +122,36 @@ def update(self, patch: "KafkaConnectionParamsPatch") -> "KafkaConnectionParams" return KafkaConnectionParams.model_validate(merged_dict) +def _parse_source_schema(data: Any) -> Any: + """Fold both wire schema shapes into the unified ``source_schema``: + + - top-level ``schema_fields`` (Open Source, and Enterprise for compatibility) + - the unified ``schema`` object (Enterprise: avsc / proto+message / + parsed_fields, and optionally fields) + + Also drops an empty ``schema_registry``. Left untouched if ``source_schema`` + is already supplied directly. + """ + if not isinstance(data, dict) or "source_schema" in data: + return data + data = dict(data) + if data.get("schema_registry", None) == {}: + data.pop("schema_registry", None) + + schema: Dict[str, Any] = {} + if "schema_fields" in data: + schema["fields"] = data.pop("schema_fields") + if "schema" in data: + wire = data.pop("schema") + if isinstance(wire, dict): + schema.update(wire) + else: + data["schema"] = wire # let validation reject a non-object schema + if schema: + data["source_schema"] = schema + return data + + class KafkaSource(SourceBaseConfig): """Kafka source configuration. @@ -81,31 +159,53 @@ class KafkaSource(SourceBaseConfig): and a single topic string. """ + model_config = ConfigDict(populate_by_name=True) + type: Literal[SourceType.KAFKA] = SourceType.KAFKA connection_params: KafkaConnectionParams topic: str consumer_group_initial_offset: ConsumerGroupOffset = ConsumerGroupOffset.LATEST schema_registry: Optional[SchemaRegistry] = Field(default=None) schema_version: Optional[str] = Field(default=None) - schema_fields: Optional[List[KafkaField]] = Field(default=None) - # Payload format. ``None`` means JSON (default) and is omitted from the - # serialized config. Concrete subclass is resolved from the format registry - # by its ``type``; SerializeAsAny preserves subclass-only fields on dump. - format: Optional[SerializeAsAny[SourceFormat]] = Field(default=None) + # Payload wire format. ``None`` means JSON (the backend default) and is + # omitted from the serialized config. ``avro`` and ``protobuf`` are + # Enterprise features and are rejected by an unlicensed backend. + format: Optional[KafkaFormat] = Field(default=None) + # All schema-related config in one place. Serialized back to the wire as + # top-level ``schema_fields`` (json, for Open Source / Enterprise compat) or + # the ``schema`` object (avro/protobuf); see the serializer below. + source_schema: Optional[KafkaSchema] = Field(default=None) @model_validator(mode="before") @classmethod - def validate_empty_schema_registry(cls, data: Any) -> Any: - if isinstance(data, dict): - if data.get("schema_registry", None) == {}: - data.pop("schema_registry", None) + def parse_schema(cls, data: Any) -> Any: + return _parse_source_schema(data) + + @model_serializer(mode="wrap") + def serialize_schema(self, handler: Any) -> Any: + """Emit json fields as top-level ``schema_fields`` (compatible with both + editions) and avro/protobuf as the ``schema`` object. ``parsed_fields`` + is read-only and is not emitted.""" + data = handler(self) + schema = data.pop("source_schema", None) + if schema: + if schema.get("fields") is not None: + data["schema_fields"] = schema["fields"] + inner = { + k: schema[k] + for k in ("avsc", "proto", "message") + if schema.get(k) is not None + } + if inner: + data["schema"] = inner return data - @field_validator("format", mode="before") - @classmethod - def resolve_format_field(cls, value: Any) -> Any: - """Dispatch a raw format dict to the concrete registered format class.""" - return resolve_format(value) + @property + def schema_fields(self) -> Optional[List[KafkaField]]: + """Backward-compatible accessor for the JSON field declarations, held + under ``schema.fields``. (``schema.parsed_fields`` is read-only backend + info and is intentionally not surfaced here.)""" + return self.source_schema.fields if self.source_schema else None @model_validator(mode="after") def validate_schema_registry_requires_version(self) -> "KafkaSource": @@ -117,11 +217,23 @@ def validate_schema_registry_requires_version(self) -> "KafkaSource": return self @model_validator(mode="after") - def validate_format(self) -> "KafkaSource": - """Let the format enforce schema-source rules against this source's - schema registry configuration.""" - if self.format is not None: - self.format.validate_against_registry(self.schema_registry is not None) + def validate_format_schema(self) -> "KafkaSource": + """The schema shape must match the declared format.""" + schema = self.source_schema + if self.format == KafkaFormat.AVRO: + if schema is None or schema.avsc is None: + raise ValueError("avro format requires schema.avsc") + elif self.format == KafkaFormat.PROTOBUF: + if schema is None or not schema.proto or not schema.message: + raise ValueError( + "protobuf format requires schema.proto and schema.message" + ) + elif schema is not None and ( + schema.avsc is not None or schema.proto or schema.message + ): + raise ValueError( + "schema.avsc / proto / message require format 'avro' or 'protobuf'" + ) return self def update(self, patch: "KafkaSourcePatch") -> "KafkaSource": @@ -136,8 +248,11 @@ def update(self, patch: "KafkaSourcePatch") -> "KafkaSource": if patch.topic is not None: update_dict.topic = patch.topic - if patch.schema_fields is not None: - update_dict.schema_fields = patch.schema_fields + if patch.format is not None: + update_dict.format = patch.format + + if patch.source_schema is not None: + update_dict.source_schema = patch.source_schema return update_dict @@ -162,6 +277,19 @@ class KafkaConnectionParamsPatch(BaseModel): class KafkaSourcePatch(SourceBaseConfigPatch): """Patch model for KafkaSource.""" + model_config = ConfigDict(populate_by_name=True) + connection_params: Optional[KafkaConnectionParamsPatch] = Field(default=None) topic: Optional[str] = Field(default=None) - schema_fields: Optional[List[KafkaField]] = Field(default=None) + format: Optional[KafkaFormat] = Field(default=None) + source_schema: Optional[KafkaSchema] = Field(default=None) + + @model_validator(mode="before") + @classmethod + def parse_schema(cls, data: Any) -> Any: + return _parse_source_schema(data) + + @property + def schema_fields(self) -> Optional[List[KafkaField]]: + """Backward-compatible accessor for ``schema.fields``.""" + return self.source_schema.fields if self.source_schema else None diff --git a/src/glassflow/etl/pipeline.py b/src/glassflow/etl/pipeline.py index 9ec4590..ae51586 100644 --- a/src/glassflow/etl/pipeline.py +++ b/src/glassflow/etl/pipeline.py @@ -448,9 +448,17 @@ def _request( ) from e except errors.UnprocessableContentError as e: self._track_event(event_name, error_type="InvalidPipelineConfig") + message = e.message or "Invalid pipeline configuration" + # The specific cause (e.g. an Avro/Protobuf schema compilation error) + # is in details.error; surface it instead of the generic message. + detail = e.details.get("error") if e.details else None + if detail: + message = f"{message}: {detail}" raise errors.PipelineInvalidConfigurationError( status_code=e.status_code, - message=e.message or "Invalid pipeline configuration", + message=message, + response=e.response, + details=e.details, ) from e except errors.APIError as e: self._track_event(event_name, error_type="InternalServerError") diff --git a/tests/test_models/test_source_formats.py b/tests/test_models/test_source_formats.py index add330f..6a70818 100644 --- a/tests/test_models/test_source_formats.py +++ b/tests/test_models/test_source_formats.py @@ -1,170 +1,226 @@ -"""Tests for source formats and the source/format registries. +"""Tests for Kafka source formats (json/avro/protobuf) and the source registry. -These prove the open-ended extension mechanism: an out-of-tree format or source -type (standing in for what the Enterprise SDK ships) plugs in via the registry -with no change to the OSS models, and SerializeAsAny preserves its fields on -dump. +The unified ``source_schema`` holds all schema config. It serializes json to a +top-level ``schema_fields`` (compatible with Open Source and Enterprise) and +avro/protobuf to the ``schema`` object (``avsc`` / ``proto``+``message``). On +reads the backend may also return ``schema.parsed_fields``. """ -from typing import Literal, Optional +from typing import Literal +from unittest.mock import patch import pytest -from pydantic import BaseModel, model_validator -from glassflow.etl import models +from glassflow.etl import errors, models from glassflow.etl.models import registry -from glassflow.etl.models.base import CaseInsensitiveStrEnum -from glassflow.etl.models.sources.formats import SourceFormat - -# --- Out-of-tree EE-like definitions (not part of OSS) ---------------------- +from tests.data import mock_responses + + +def _kafka(**overrides) -> dict: + base = { + "type": "kafka", + "source_id": "events", + "connection_params": {"brokers": ["b:9092"], "protocol": "PLAINTEXT"}, + "topic": "events", + "consumer_group_initial_offset": "earliest", + } + base.update(overrides) + return base + + +AVSC = { + "type": "record", + "name": "Event", + "namespace": "test", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "ts_ms", "type": "long"}, + ], +} +PROTO_TEXT = 'syntax = "proto3";\npackage test;\nmessage Event {\n string id = 1;\n}' -class SchemaSource(CaseInsensitiveStrEnum): - INLINE = "inline" - REGISTRY = "registry" +class TestJsonFormat: + def test_format_defaults_to_none_and_is_omitted(self): + src = models.KafkaSource.model_validate(_kafka()) + assert src.format is None + assert "format" not in src.model_dump(by_alias=True, exclude_none=True) + def test_top_level_schema_fields_round_trips(self): + wire = _kafka( + format="json", + schema_fields=[ + {"name": "id", "type": "string"}, + {"name": "ts_ms", "type": "int"}, + ], + ) + src = models.KafkaSource.model_validate(wire) + assert src.source_schema.fields[0].name == "id" + assert src.schema_fields[0].name == "id" # compat accessor + dumped = src.model_dump(by_alias=True, exclude_none=True) + # json always serializes to top-level schema_fields (OSS + EE compat) + assert dumped["schema_fields"] == wire["schema_fields"] + assert "schema" not in dumped + assert "source_schema" not in dumped -class ProtobufConfig(BaseModel): - schema_source: SchemaSource - proto_text: Optional[str] = None + def test_unified_schema_fields_input_also_accepted(self): + # EE may return json fields under the unified schema object. + src = models.KafkaSource.model_validate( + _kafka(format="json", schema={"fields": [{"name": "id", "type": "string"}]}) + ) + assert src.schema_fields[0].name == "id" + # still emits the compatible top-level form + dumped = src.model_dump(by_alias=True, exclude_none=True) + assert dumped["schema_fields"] == [{"name": "id", "type": "string"}] - @model_validator(mode="after") - def _require_inline_text(self): - if self.schema_source == SchemaSource.INLINE and not self.proto_text: - raise ValueError("proto_text is required when schema_source is 'inline'") - return self +class TestAvroFormat: + def test_avro_round_trips(self): + src = models.KafkaSource.model_validate( + _kafka(format="avro", schema={"avsc": AVSC}) + ) + assert src.format == models.KafkaFormat.AVRO + assert src.source_schema.avsc.name == "Event" + dumped = src.model_dump(by_alias=True, exclude_none=True) + assert dumped["schema"] == {"avsc": AVSC} + assert "schema_fields" not in dumped -class ProtobufFormat(SourceFormat): - type: Literal["protobuf"] = "protobuf" - protobuf: ProtobufConfig + def test_avro_requires_avsc(self): + with pytest.raises(ValueError, match="avro format requires schema.avsc"): + models.KafkaSource.model_validate(_kafka(format="avro")) - def validate_against_registry(self, has_schema_registry: bool) -> None: - if self.protobuf.schema_source == SchemaSource.REGISTRY and not ( - has_schema_registry - ): - raise ValueError( - "protobuf with schema_source 'registry' requires a schema registry " - "on the source" + def test_avsc_must_be_a_record_with_fields(self): + with pytest.raises(ValueError): + models.KafkaSource.model_validate( + _kafka(format="avro", schema={"avsc": {"type": "string", "name": "x"}}) + ) + with pytest.raises(ValueError): + models.KafkaSource.model_validate( + _kafka(format="avro", schema={"avsc": {"type": "record", "name": "E"}}) ) - -class KinesisSource(models.SourceBaseConfig): - type: Literal["kinesis"] = "kinesis" - stream_name: str - region: str - - -@pytest.fixture -def register_ee_types(): - """Register the out-of-tree types and clean up afterward.""" - registry.register_format(ProtobufFormat) - registry.register_source(KinesisSource) - yield - registry._FORMAT_CLASSES.pop("protobuf", None) - registry._SOURCE_CLASSES.pop("kinesis", None) + def test_avsc_preserves_extra_keys(self): + avsc = { + "type": "record", + "name": "Event", + "namespace": "test", + "doc": "an event", + "fields": [{"name": "meta", "type": {"type": "map", "values": "string"}}], + } + src = models.KafkaSource.model_validate( + _kafka(format="avro", schema={"avsc": avsc}) + ) + dumped = src.model_dump(by_alias=True, exclude_none=True) + assert dumped["schema"]["avsc"] == avsc -# The exact payload from the ticket discussion. -PROTOBUF_SOURCE = { - "type": "kafka", - "source_id": "events", - "connection_params": { - "brokers": ["kafka-controller-0.staging-cluster.glassflow.xyz:9094"], - "mechanism": "PLAIN", - "protocol": "SASL_PLAINTEXT", - "username": "glassflow", - "password": "secret", - }, - "topic": "test_proto_events_dedup", - "format": { - "type": "protobuf", - "protobuf": { - "schema_source": "inline", - "proto_text": 'syntax = "proto3";\nmessage Event {\n string id = 1;\n}', - }, - }, - "consumer_group_initial_offset": "earliest", -} +class TestProtobufFormat: + def test_protobuf_round_trips(self): + src = models.KafkaSource.model_validate( + _kafka(format="protobuf", schema={"proto": PROTO_TEXT, "message": "Event"}) + ) + assert src.format == models.KafkaFormat.PROTOBUF + assert src.source_schema.proto == PROTO_TEXT + assert src.source_schema.message == "Event" + dumped = src.model_dump(by_alias=True, exclude_none=True) + assert dumped["schema"] == {"proto": PROTO_TEXT, "message": "Event"} + def test_protobuf_requires_proto_and_message(self): + with pytest.raises(ValueError, match="schema.proto and schema.message"): + models.KafkaSource.model_validate( + _kafka(format="protobuf", schema={"proto": PROTO_TEXT}) + ) -class TestJsonFormat: - """OSS default format behaviour.""" - def test_format_defaults_to_none(self): +class TestParsedFields: + def test_parsed_fields_read_only_not_surfaced_or_emitted(self): + # Shape the backend returns for an avro source on GET. src = models.KafkaSource.model_validate( - { - "type": "kafka", - "source_id": "s1", - "connection_params": {"brokers": ["b:9092"], "protocol": "PLAINTEXT"}, - "topic": "t", - } + _kafka( + format="avro", + schema={ + "avsc": AVSC, + "parsed_fields": [ + {"name": "id", "type": "string"}, + {"name": "ts_ms", "type": "int"}, + ], + }, + ) ) - assert src.format is None - # Omitted from the serialized config so OSS payloads are unchanged. - assert "format" not in src.model_dump(by_alias=True, exclude_none=True) + # Available as informational backend output... + assert src.source_schema.parsed_fields[0].name == "id" + # ...but not surfaced via the schema_fields compat accessor (json only), + assert src.schema_fields is None + # ...and not emitted back on dump. + dumped = src.model_dump(by_alias=True, exclude_none=True) + assert dumped["schema"] == {"avsc": AVSC} + assert "parsed_fields" not in dumped["schema"] - def test_explicit_json_format_roundtrips(self): + def test_parsed_fields_on_json_get(self): + # GET of a json source: schema_fields (or schema.fields) plus parsed_fields. src = models.KafkaSource.model_validate( - { - "type": "kafka", - "source_id": "s1", - "connection_params": {"brokers": ["b:9092"], "protocol": "PLAINTEXT"}, - "topic": "t", - "format": {"type": "json"}, - } + _kafka( + format="json", + schema_fields=[{"name": "id", "type": "string"}], + schema={"parsed_fields": [{"name": "id", "type": "string"}]}, + ) ) - assert isinstance(src.format, models.JsonFormat) + assert src.schema_fields[0].name == "id" + assert src.source_schema.parsed_fields[0].name == "id" dumped = src.model_dump(by_alias=True, exclude_none=True) - assert dumped["format"] == {"type": "json"} + assert dumped["schema_fields"] == [{"name": "id", "type": "string"}] + assert "schema" not in dumped # parsed_fields not echoed back + - def test_unknown_format_raises(self): - with pytest.raises(ValueError, match="Unknown format type 'avro'"): +class TestSchemaFormatConsistency: + def test_avsc_without_avro_format_rejected(self): + with pytest.raises(ValueError, match="require format 'avro' or 'protobuf'"): models.KafkaSource.model_validate( - { - "type": "kafka", - "source_id": "s1", - "connection_params": { - "brokers": ["b:9092"], - "protocol": "PLAINTEXT", - }, - "topic": "t", - "format": {"type": "avro"}, - } + _kafka(format="json", schema={"avsc": AVSC}) ) -class TestRegisteredFormat: - """An out-of-tree format plugs in via the registry.""" +class TestSchemaErrorSurfacing: + """A backend 422 puts the specific cause in details.error; the SDK surfaces + it on the create/edit path instead of the generic message.""" - def test_protobuf_payload_roundtrips(self, register_ee_types): - src = models.KafkaSource.model_validate(PROTOBUF_SOURCE) + def test_invalid_schema_surfaces_backend_detail(self, pipeline, mock_track): + resp = mock_responses.create_mock_response_factory()( + status_code=422, + json_data={ + "status": 422, + "code": "unprocessable_entity", + "message": "failed to convert request to pipeline model", + "details": {"error": 'source "events": proto compilation error: boom'}, + }, + ) + with patch( + "httpx.Client.request", side_effect=resp.raise_for_status.side_effect + ): + with pytest.raises(errors.PipelineInvalidConfigurationError) as exc: + pipeline.create() - assert isinstance(src.format, ProtobufFormat) - assert src.format.protobuf.schema_source == SchemaSource.INLINE + assert "proto compilation error: boom" in str(exc.value) + assert exc.value.details["error"].startswith('source "events"') - # SerializeAsAny preserves the subclass-only nested config on dump, - # round-tripping to the exact wire shape. - dumped = src.model_dump(by_alias=True, exclude_none=True) - assert dumped["format"] == PROTOBUF_SOURCE["format"] - def test_protobuf_inline_requires_text(self, register_ee_types): - bad = {**PROTOBUF_SOURCE} - bad["format"] = {"type": "protobuf", "protobuf": {"schema_source": "inline"}} - with pytest.raises(ValueError, match="proto_text is required"): - models.KafkaSource.model_validate(bad) +# --- Source registry: an out-of-tree source type plugs in ------------------- - def test_protobuf_registry_source_requires_schema_registry(self, register_ee_types): - # schema_source 'registry' but no schema_registry on the source -> hook fires. - bad = {**PROTOBUF_SOURCE} - bad["format"] = {"type": "protobuf", "protobuf": {"schema_source": "registry"}} - with pytest.raises(ValueError, match="requires a schema registry"): - models.KafkaSource.model_validate(bad) +class KinesisSource(models.SourceBaseConfig): + type: Literal["kinesis"] = "kinesis" + stream_name: str + region: str -class TestRegisteredSourceType: - """An out-of-tree source type plugs in via the registry.""" +@pytest.fixture +def register_kinesis(): + registry.register_source(KinesisSource) + yield + registry._SOURCE_CLASSES.pop("kinesis", None) + + +class TestRegisteredSourceType: def _config(self, source: dict) -> dict: return { "pipeline_id": "p1", @@ -187,7 +243,7 @@ def _config(self, source: dict) -> dict: }, } - def test_kinesis_source_dispatches_and_roundtrips(self, register_ee_types): + def test_kinesis_dispatches_and_roundtrips(self, register_kinesis): kinesis = { "type": "kinesis", "source_id": "k1", @@ -195,12 +251,9 @@ def test_kinesis_source_dispatches_and_roundtrips(self, register_ee_types): "region": "eu-central-1", } cfg = models.PipelineConfig.model_validate(self._config(kinesis)) - assert isinstance(cfg.sources[0], KinesisSource) - # SerializeAsAny keeps kinesis-only fields through the base-typed field. dumped = cfg.model_dump(by_alias=True, exclude_none=True) assert dumped["sources"][0]["stream_name"] == "events" - assert dumped["sources"][0]["region"] == "eu-central-1" def test_unknown_source_type_raises(self): with pytest.raises(ValueError, match="Unknown source type 'pubsub'"):