Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 31 additions & 16 deletions src/glassflow/etl/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,66 +67,81 @@ def _raise_api_error(response: httpx.Response) -> None:
error_data = response.json()
message = error_data.get("message", None)
code = error_data.get("code", None)
details = error_data.get("details", None)
except json.JSONDecodeError:
message = f"{status_code} {response.reason_phrase}"
code = None
details = None
error_data = {}

if status_code == 400:
# Handle specific status validation error codes
if code == "TERMINAL_STATE_VIOLATION":
raise errors.TerminalStateViolationError(
status_code, message, response=response
status_code, message, response=response, details=details
)
elif code == "INVALID_STATUS_TRANSITION":
raise errors.InvalidStatusTransitionError(
status_code,
message,
response=response,
status_code, message, response=response, details=details
)
elif code == "UNKNOWN_STATUS":
raise errors.UnknownStatusError(status_code, message, response=response)
raise errors.UnknownStatusError(
status_code, message, response=response, details=details
)
elif code == "PIPELINE_ALREADY_IN_STATE":
raise errors.PipelineAlreadyInStateError(
status_code, message, response=response
status_code, message, response=response, details=details
)
elif code == "PIPELINE_IN_TRANSITION":
raise errors.PipelineInTransitionError(
status_code, message, response=response
status_code, message, response=response, details=details
)
elif message and message.startswith("invalid json:"):
raise errors.InvalidJsonError(status_code, message, response=response)
raise errors.InvalidJsonError(
status_code, message, response=response, details=details
)
elif message and message == "pipeline id cannot be empty":
raise errors.EmptyPipelineIdError(
status_code, message, response=response
status_code, message, response=response, details=details
)
elif message and message.startswith(
"pipeline can only be deleted if it's stopped or terminated"
):
raise errors.PipelineDeletionStateViolationError(
status_code, message, response=response
status_code, message, response=response, details=details
)
else:
# Generic 400 error for unknown codes
raise errors.ValidationError(status_code, message, response=response)
raise errors.ValidationError(
status_code, message, response=response, details=details
)
elif status_code == 403:
raise errors.ForbiddenError(status_code, message, response=response)
raise errors.ForbiddenError(
status_code, message, response=response, details=details
)
elif status_code == 404:
raise errors.NotFoundError(status_code, message, response=response)
raise errors.NotFoundError(
status_code, message, response=response, details=details
)
elif status_code == 409:
raise errors.ConflictError(status_code, message, response=response)
raise errors.ConflictError(
status_code, message, response=response, details=details
)
elif status_code == 422:
raise errors.UnprocessableContentError(
status_code, message, response=response
status_code, message, response=response, details=details
)
elif status_code == 500:
raise errors.ServerError(status_code, message, response=response)
raise errors.ServerError(
status_code, message, response=response, details=details
)
else:
raise errors.APIError(
status_code,
message="An error occurred: "
f"({status_code} {response.reason_phrase}) {message}",
response=response,
details=details,
)

def _track_event(self, event_name: str, **kwargs: Any) -> None:
Expand Down
6 changes: 5 additions & 1 deletion src/glassflow/etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ class ConnectionError(RequestError):
class APIError(GlassFlowError):
"""Base for API response errors."""

def __init__(self, status_code, message=None, response=None):
def __init__(self, status_code, message=None, response=None, details=None):
self.status_code = status_code
self.response = response
self.message = message
# The API's structured ``details`` object, when present. For invalid
# configs it carries the specific cause under ``details["error"]`` (for
# example an Avro/Protobuf schema compilation error).
self.details = details or {}
super().__init__(self.message)


Expand Down
10 changes: 6 additions & 4 deletions src/glassflow/etl/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
from .source import SourceBaseConfig, SourceBaseConfigPatch, SourceType
from .sources import (
AnySource,
AvroSchema,
ConsumerGroupOffset,
JsonFormat,
KafkaConnectionParams,
KafkaConnectionParamsPatch,
KafkaField,
KafkaFormat,
KafkaMechanism,
KafkaProtocol,
KafkaSchema,
KafkaSource,
KafkaSourcePatch,
OTLPLogsSource,
Expand All @@ -42,7 +44,6 @@
OTLPTracesSource,
SchemaRegistry,
SourceConfig,
SourceFormat,
)
from .transforms import (
DedupTransform,
Expand All @@ -64,6 +65,7 @@

__all__ = [
"AnySource",
"AvroSchema",
"ClickhouseConnectionParams",
"ClickhouseConnectionParamsPatch",
"ClickhouseDataType",
Expand All @@ -77,15 +79,16 @@
"JoinConfig",
"JoinConfigPatch",
"JoinOutputField",
"JsonFormat",
"JoinSourceConfig",
"JoinType",
"KafkaConnectionParams",
"KafkaConnectionParamsPatch",
"KafkaDataType",
"KafkaField",
"KafkaFormat",
"KafkaMechanism",
"KafkaProtocol",
"KafkaSchema",
"KafkaSource",
"KafkaSourcePatch",
"MetadataConfig",
Expand All @@ -109,7 +112,6 @@
"SourceBaseConfig",
"SourceBaseConfigPatch",
"SourceConfig",
"SourceFormat",
"SourceResourceEntry",
"SourceType",
"StatelessTransform",
Expand Down
24 changes: 5 additions & 19 deletions src/glassflow/etl/models/registry.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
"""Extensible registries for source types and source formats.
"""Extensible registry for source types.

Source types (``kafka``, ``otlp.logs``, ...) and source formats (``json``,
and Enterprise ``avro``/``protobuf``) are open-ended: the Enterprise SDK adds
Source types (``kafka``, ``otlp.logs``, ...) are open-ended: an edition can add
new ones without modifying this package. Instead of a static Pydantic
discriminated union (whose members are frozen at class-definition time), models
annotate their fields with the base type and dispatch each value to the
concrete class by its ``type`` string at validation time, looked up here.
annotate their field with the base type and dispatch each value to the concrete
class by its ``type`` string at validation time, looked up here.

Editions register their classes at import::

from glassflow.etl.models.registry import register_source, register_format
from glassflow.etl.models.registry import register_source
register_source(KinesisSource)
register_format(AvroFormat)
"""

from __future__ import annotations
Expand All @@ -23,7 +21,6 @@
_T = TypeVar("_T", bound=BaseModel)

_SOURCE_CLASSES: Dict[str, Type[BaseModel]] = {}
_FORMAT_CLASSES: Dict[str, Type[BaseModel]] = {}


def _type_key(cls: Type[BaseModel]) -> str:
Expand All @@ -44,13 +41,6 @@ def register_source(cls: Type[_T]) -> Type[_T]:
return cls


def register_format(cls: Type[_T]) -> Type[_T]:
"""Register a source format class, keyed by its ``type`` default. Usable as
a decorator."""
_FORMAT_CLASSES[_type_key(cls)] = cls
return cls


def _resolve(value: Any, registry: Dict[str, Type[BaseModel]], kind: str) -> Any:
"""Coerce a raw dict to the registered concrete model by its ``type``.

Expand All @@ -69,7 +59,3 @@ def _resolve(value: Any, registry: Dict[str, Type[BaseModel]], kind: str) -> Any

def resolve_source(value: Any) -> Any:
return _resolve(value, _SOURCE_CLASSES, "source")


def resolve_format(value: Any) -> Any:
return _resolve(value, _FORMAT_CLASSES, "format")
10 changes: 6 additions & 4 deletions src/glassflow/etl/models/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

from ..registry import register_source
from ..source import SourceBaseConfig, SourceBaseConfigPatch, SourceType
from .formats import JsonFormat, SourceFormat
from .kafka import (
AvroSchema,
ConsumerGroupOffset,
KafkaConnectionParams,
KafkaConnectionParamsPatch,
KafkaField,
KafkaFormat,
KafkaMechanism,
KafkaProtocol,
KafkaSchema,
KafkaSource,
KafkaSourcePatch,
SchemaRegistry,
Expand Down Expand Up @@ -66,12 +68,15 @@
"SourceBaseConfig",
"SourceBaseConfigPatch",
# Kafka
"AvroSchema",
"ConsumerGroupOffset",
"KafkaConnectionParams",
"KafkaConnectionParamsPatch",
"KafkaField",
"KafkaFormat",
"KafkaMechanism",
"KafkaProtocol",
"KafkaSchema",
"KafkaSource",
"KafkaSourcePatch",
"SchemaRegistry",
Expand All @@ -81,9 +86,6 @@
"OTLPSource",
"OTLPSourcePatch",
"OTLPTracesSource",
# Formats
"SourceFormat",
"JsonFormat",
# Union
"AnySource",
"SourceConfig",
Expand Down
46 changes: 0 additions & 46 deletions src/glassflow/etl/models/sources/formats.py

This file was deleted.

Loading