Skip to content

feat(ee): Enterprise SDK — client, DLQ reprocessing, and Avro/Protobuf formats#68

Open
PabloPardoGarcia wants to merge 20 commits into
mainfrom
pablo/etl-1186-add-ee-client-to-sdk
Open

feat(ee): Enterprise SDK — client, DLQ reprocessing, and Avro/Protobuf formats#68
PabloPardoGarcia wants to merge 20 commits into
mainfrom
pablo/etl-1186-add-ee-client-to-sdk

Conversation

@PabloPardoGarcia

@PabloPardoGarcia PabloPardoGarcia commented Jun 11, 2026

Copy link
Copy Markdown
Member

What

Establishes the GlassFlow Enterprise SDK and the Enterprise features built on it. Implements ETL-1186, and accumulates the work merged on top from #69 (ETL-1187, DLQ) and #70 (ETL-1197, Avro/Protobuf).

from glassflow.ee import Client

client = Client(host="https://...")
pipeline = client.get_pipeline("my-pipeline")

Enterprise client scaffold (ETL-1186)

  • ee.Client(etl.Client) and ee.Pipeline(etl.Pipeline) subclass the Open Source classes; Client returns the EE Pipeline via a _pipeline_class seam, so editions propagate without re-implementing construction. from glassflow.ee import Client is the single opt-in (same package today; can graduate to a separate licensed wheel later with no import change).
  • Source-type registry (models/registry.py): PipelineConfig.sources dispatches by type so new source types register without redefining a discriminated union. OSS sources register at import.
  • _config_class / _config_patch_class seams on Pipeline; validate_config is a classmethod.

DLQ message processing (ETL-1187)

On ee.DLQ:

  • list(batch_size, cursor, component) — non-destructive, paginated; returns {messages, next_cursor, has_more}. component filters by data-plane stage (ingestor/join/sink/dedup/oltp-receiver, validated client-side).
  • list_iter(...) — lazy generator that pages via the cursor and yields messages.
  • reprocess(ids) / reprocess_all() and discard(ids) / discard_all()mode=selected|all, message_ids ≤ 1000 (validated client-side).
  • Deprecations: consume()list(), purge()discard_all().
  • Errors: 403FeatureNotLicensedError; reprocess on a non-Running pipeline → 409PipelineNotRunningError (new ConflictError).

Avro and Protobuf formats (ETL-1197)

  • Kafka source gains format (json/avro/protobuf) and a unified schema (KafkaSchema): fields (json), file (the inline .avsc/.proto text, shared by avro and protobuf), message_type (protobuf), and read-only parsed_fields (returned on GET, never sent back, not used in SDK logic). The avsc/proto are opaque strings validated by the backend.
  • Everything serializes under the schema object (fields / file [+ message_type]). The legacy top-level schema_fields is deprecated but still accepted on input and upgraded to schema.fields.

Pipeline streams (ETL-1188)

  • ee.Pipeline.get_streams() wraps GET /api/v1/pipeline/{id}/streams and returns the pipeline's NATS JetStream streams as [{"stream_name": ..., "component": ...}] (component = the data-plane stage: ingestor/join/sink/dedup/dlq). Useful for NATS-level diagnostics (unblocks ETL-1101). 404PipelineNotFoundError, 403FeatureNotLicensedError.
  • Verified live against staging (the /streams endpoint is already deployed).

Error detail

APIError carries the structured details, and the create/edit path composes details.error (e.g. an Avro/Protobuf schema compilation error) into PipelineInvalidConfigurationError instead of the generic message.

TODO

  • get_streams (ETL-1188) verified live against staging.
  • Test the remaining features end-to-end against a live cluster once the unified schema contract and DLQ endpoints are released to the backend main:
    • avro + protobuf: create pipelines via the SDK, produce encoded events to Kafka, verify rows land in ClickHouse, and confirm get_pipeline round-trips schema.avsc/proto + parsed_fields.
    • DLQ: list/list_iter with real messages, component filter, and reprocess/discard (+ the 409 → PipelineNotRunningError path) against populated DLQs.

Testing

255 passed, ruff clean. Unit-tested across the client/pipeline wiring, DLQ operations and deprecations, all schema input forms and round-trips, and error-detail surfacing.

🤖 Generated with Claude Code

… model layer

Introduce `glassflow.ee` as a superset of `glassflow.etl`:
- `ee.Client` / `ee.Pipeline` subclass the OSS classes; `Client` returns the
  EE `Pipeline` via a `_pipeline_class` seam so editions propagate without
  re-implementing construction.

Lay the OSS foundation for open-ended, edition-extensible config models:
- Add source and format registries (`models/registry.py`) with dispatch by
  `type` string, replacing the static discriminated union on
  `PipelineConfig.sources`. New source types and formats register themselves
  at import with no change to OSS models.
- Add a `format` field to `KafkaSource` (tagged object: `json` in OSS,
  `avro`/`protobuf` to come in EE) with a polymorphic validation hook.
  `SerializeAsAny` preserves subclass-only fields on dump.
- Add `_config_class` / `_config_patch_class` seams on `Pipeline` and make
  `validate_config` a classmethod so editions can use an extended config.
- Keep a `_dlq_class` seam wired (pointing at the OSS DLQ) so the Enterprise
  DLQ can drop in via a follow-up PR with no OSS churn.

DLQ-specific Enterprise capabilities (reprocessing/index consumption) are
intentionally deferred to a follow-up PR (ETL-1187).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@github-actions

Copy link
Copy Markdown
Contributor

Coverage

Test Coverage Report
FileStmtsMissCoverMissing
ee
   __init__.py40100% 
   client.py50100% 
   pipeline.py30100% 
etl
   __init__.py50100% 
   api_client.py64592%70–73, 123
   client.py47491%65–67, 188
   dlq.py41197%70
   errors.py280100% 
   pipeline.py152398%376, 450–451
   tracking.py260100% 
etl/models
   __init__.py100100% 
   base.py10190%9
   config.py34682%30–35
   data_types.py810100% 
   metadata.py40100% 
   pipeline.py1381390%65, 71, 76, 110, 115, 118, 137, 146, 171, 197, 210, 213, 216
   registry.py30196%33
   resources.py731184%33, 42–47, 93, 97, 122, 124
   sink.py77790%36, 44, 46, 81, 83, 85, 87
   source.py140100% 
etl/models/sources
   __init__.py140100% 
   formats.py110100% 
   kafka.py106298%66, 101
   otlp.py110100% 
etl/models/transforms
   __init__.py250100% 
   deduplication.py18194%16
   filter.py30100% 
   join.py51198%49
   stateless.py80100% 
TOTAL10935694% 

Implements the SDK side of DLQ message processing (ETL-1187) on the Enterprise
DLQ:
- `ee.DLQ.list(batch_size, cursor)` — non-destructive, paginated read. Messages
  now carry `message_id` (NATS seq), `source`, and `received_at`.
- `reprocess(message_ids)` / `reprocess_all()` — move messages back into the
  pipeline input (POST /dlq/reprocess, mode=selected|all).
- `discard(message_ids)` / `discard_all()` — permanently remove (POST
  /dlq/discard, mode=selected|all).

Deprecate the inherited operations whose endpoints are changing under EE:
- `consume()` warns and delegates to `list()` (the /dlq/consume endpoint is
  being removed in favour of non-destructive /dlq/list), so callers keep
  working through the transition.
- `purge()` warns and points to `discard_all()`; the legacy endpoint still works.

message_ids are validated client-side (non-empty, <=1000) to mirror the backend
and fail fast. A 403 from an unlicensed backend maps to FeatureNotLicensedError
(re-added), which subclasses ForbiddenError so existing 403 handling still
catches it. `ee.Pipeline` wires `_dlq_class` to the EE DLQ so
`client.get_pipeline(id).dlq` exposes these methods.

Contract per the DLQ Message Processing design doc / glassflow-etl-ee API. The
backend reprocess/discard handlers and the consume->list rename are still in
flight; the SDK is written against the agreed contract and tests mock the HTTP
layer.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
PabloPardoGarcia and others added 5 commits June 12, 2026 15:30
Reprocess replays messages through the running pipeline, so the API returns
409 Conflict when the pipeline is not in the Running state. Map it to a clear,
catchable error instead of a generic APIError:

- Add ConflictError (409) to the base client error mapping.
- Add PipelineNotRunningError(ConflictError); reprocess/reprocess_all raise it
  on 409. Discard has no Running-state constraint, so its 409s stay ConflictError.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add an Enterprise Edition section covering the drop-in `glassflow.ee` client
and DLQ management (list / reprocess / discard), including the
FeatureNotLicensedError and PipelineNotRunningError behaviors and the 1000-id
limit. Add a Features bullet for discoverability.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Align the EE DLQ list endpoint with glassflow-etl-ee#35 (ETL-1200) and make
iterating messages ergonomic:

- list() gains a `component` filter (ingestor/join/sink/dedup/oltp-receiver)
  and now returns the paginated envelope {messages, next_cursor, has_more}
  instead of a bare list; batch_size max raised to the backend's 1000.
- Add list_iter(), a lazy generator that pages via the cursor and yields
  individual messages, so callers do not manage the cursor by hand.
- consume() (deprecated) unwraps the new envelope to keep its legacy list shape.

Update tests and the README accordingly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add DLQ_COMPONENTS and reject an unknown `component` in list() with a ValueError
before the request, instead of relying only on the server's 422. Verified
against the live staging EE API (glassflow-etl-ee#35): valid components accepted,
unknown rejected, and reprocess on a stopped pipeline returns 409 ->
PipelineNotRunningError as mapped.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ocessing

feat(ee): add DLQ list / reprocess / discard to the Enterprise client
@github-actions

Copy link
Copy Markdown
Contributor

Coverage

Test Coverage Report
FileStmtsMissCoverMissing
ee
   __init__.py50100% 
   client.py50100% 
   dlq.py64198%213
   pipeline.py110100% 
etl
   __init__.py50100% 
   api_client.py66592%70–73, 125
   client.py47491%65–67, 188
   dlq.py41197%70
   errors.py310100% 
   pipeline.py152398%376, 450–451
   tracking.py260100% 
etl/models
   __init__.py100100% 
   base.py10190%9
   config.py34682%30–35
   data_types.py810100% 
   metadata.py40100% 
   pipeline.py1381390%65, 71, 76, 110, 115, 118, 137, 146, 171, 197, 210, 213, 216
   registry.py30196%33
   resources.py731184%33, 42–47, 93, 97, 122, 124
   sink.py77790%36, 44, 46, 81, 83, 85, 87
   source.py140100% 
etl/models/sources
   __init__.py140100% 
   formats.py110100% 
   kafka.py106298%66, 101
   otlp.py110100% 
etl/models/transforms
   __init__.py250100% 
   deduplication.py18194%16
   filter.py30100% 
   join.py51198%49
   stateless.py80100% 
TOTAL11715795% 

github-actions Bot and others added 2 commits June 15, 2026 16:40
…TL-1197)

Add `format` (json/avro/protobuf) and a unified `schema` to the Kafka source,
matching the agreed backend contract.

- KafkaSchema groups all schema config: `fields` (json), `avsc` (Avro record),
  `proto` + `message` (protobuf), and read-only `parsed_fields` (returned by the
  backend on GET, never sent back and not used in SDK logic). AvroSchema enforces
  the structure GlassFlow needs to map to ClickHouse: a `record` with a `name`
  and non-empty `fields`; extra Avro keys are preserved. Full Avro correctness is
  left to the backend.
- Compatibility across editions: input accepts both a top-level `schema_fields`
  (Open Source, and Enterprise for POST compat) and the unified `schema` object
  (Enterprise). On output, json serializes to top-level `schema_fields` (works on
  both editions) and avro/protobuf to the `schema` object. A `schema_fields` read
  property keeps the dedup/join validators and existing callers working.
- Replace the speculative tagged-format foundation (formats.py + the format
  registry); the source-type registry is unchanged.

Surface backend validation detail: APIError now carries the structured
`details`, and the create/edit path composes details.error (e.g. a proto/avro
compilation error) into PipelineInvalidConfigurationError.

avro/protobuf are Enterprise features enforced by the backend; the SDK models
them on the shared OSS source so OSS clients can also round-trip such configs.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…for-avro-and-protobuf

feat: add Avro and Protobuf format support to Kafka source (ETL-1197)
@github-actions

Copy link
Copy Markdown
Contributor

Coverage

Test Coverage Report
FileStmtsMissCoverMissing
ee
   __init__.py50100% 
   client.py50100% 
   dlq.py64198%213
   pipeline.py110100% 
etl
   __init__.py50100% 
   api_client.py68691%71–75, 139
   client.py47491%65–67, 188
   dlq.py41197%70
   errors.py320100% 
   pipeline.py156199%376
   tracking.py260100% 
etl/models
   __init__.py100100% 
   base.py10190%9
   config.py34682%30–35
   data_types.py810100% 
   metadata.py40100% 
   pipeline.py1381390%65, 71, 76, 110, 115, 118, 137, 146, 171, 197, 210, 213, 216
   registry.py24195%30
   resources.py731184%33, 42–47, 93, 97, 122, 124
   sink.py77790%36, 44, 46, 81, 83, 85, 87
   source.py140100% 
etl/models/sources
   __init__.py130100% 
   kafka.py163696%114, 136, 139, 149, 252, 295
   otlp.py110100% 
etl/models/transforms
   __init__.py250100% 
   deduplication.py18194%16
   filter.py30100% 
   join.py51198%49
   stateless.py80100% 
TOTAL12176095% 

@PabloPardoGarcia PabloPardoGarcia changed the title feat(ee): add Enterprise client scaffold and extensible source/format model layer feat(ee): Enterprise SDK — client, DLQ reprocessing, and Avro/Protobuf formats Jun 16, 2026
@github-actions

Copy link
Copy Markdown
Contributor

Coverage

Test Coverage Report
FileStmtsMissCoverMissing
glassflow/ee
   __init__.py50100% 
   client.py50100% 
   dlq.py64198%213
   pipeline.py110100% 
glassflow/etl
   __init__.py50100% 
   api_client.py68691%71–75, 139
   client.py47491%65–67, 188
   dlq.py41197%70
   errors.py320100% 
   pipeline.py156199%376
   tracking.py260100% 
glassflow/etl/models
   __init__.py100100% 
   base.py10190%9
   config.py34682%30–35
   data_types.py810100% 
   metadata.py40100% 
   pipeline.py1381390%65, 71, 76, 110, 115, 118, 137, 146, 171, 197, 210, 213, 216
   registry.py24195%30
   resources.py731184%33, 42–47, 93, 97, 122, 124
   sink.py77790%36, 44, 46, 81, 83, 85, 87
   source.py140100% 
glassflow/etl/models/sources
   __init__.py130100% 
   kafka.py163696%114, 136, 139, 149, 252, 295
   otlp.py110100% 
glassflow/etl/models/transforms
   __init__.py250100% 
   deduplication.py18194%16
   filter.py30100% 
   join.py51198%49
   stateless.py80100% 
TOTAL12176095% 

github-actions Bot and others added 2 commits June 16, 2026 14:03
Add ee.Pipeline.get_streams(), wrapping GET /api/v1/pipeline/{id}/streams. It
returns the pipeline's NATS JetStream streams as [{stream_name, component}],
useful for diagnosing NATS-level issues. 404 maps to PipelineNotFoundError and
403 to FeatureNotLicensedError. Enterprise-only, so it lives on ee.Pipeline.

Verified live against staging pipelines.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…o-get-pipeline-stream-names

feat(ee): add Pipeline.get_streams for NATS stream names (ETL-1188)
@github-actions

Copy link
Copy Markdown
Contributor

Coverage

Test Coverage Report
FileStmtsMissCoverMissing
ee
   __init__.py50100% 
   client.py50100% 
   dlq.py64198%213
   pipeline.py210100% 
etl
   __init__.py50100% 
   api_client.py68691%71–75, 139
   client.py47491%65–67, 188
   dlq.py41197%70
   errors.py320100% 
   pipeline.py156199%376
   tracking.py260100% 
etl/models
   __init__.py100100% 
   base.py10190%9
   config.py34682%30–35
   data_types.py810100% 
   metadata.py40100% 
   pipeline.py1381390%65, 71, 76, 110, 115, 118, 137, 146, 171, 197, 210, 213, 216
   registry.py24195%30
   resources.py731184%33, 42–47, 93, 97, 122, 124
   sink.py77790%36, 44, 46, 81, 83, 85, 87
   source.py140100% 
etl/models/sources
   __init__.py130100% 
   kafka.py163696%114, 136, 139, 149, 252, 295
   otlp.py110100% 
etl/models/transforms
   __init__.py250100% 
   deduplication.py18194%16
   filter.py30100% 
   join.py51198%49
   stateless.py80100% 
TOTAL12276095% 

github-actions Bot and others added 2 commits June 16, 2026 14:17
Update the Kafka source schema to the latest backend contract:

- The Avro `.avsc` and Protobuf `.proto` schemas are unified into a single
  inline-string `schema.file`; the protobuf message name is `schema.message_type`
  (was `message`). The structural AvroSchema model is removed (the avsc is now an
  opaque string validated by the backend).
- json fields now serialize to `schema.fields` (the unified object); the legacy
  top-level `schema_fields` is still accepted on input and upgraded to
  `schema.fields`. This removes the previous split serializer.
- `schema.parsed_fields` remains read-only: parsed on GET, exposed for
  inspection, never sent back on create/edit.

Validation: avro requires `schema.file`; protobuf requires `schema.file` and
`schema.message_type`; using either with a non-avro/protobuf format is rejected.
@github-actions

Copy link
Copy Markdown
Contributor

Coverage

Test Coverage Report
FileStmtsMissCoverMissing
glassflow/ee
   __init__.py50100% 
   client.py50100% 
   dlq.py64198%213
   pipeline.py210100% 
glassflow/etl
   __init__.py50100% 
   api_client.py68691%71–75, 139
   client.py47491%65–67, 188
   dlq.py41197%70
   errors.py320100% 
   pipeline.py156199%376
   tracking.py260100% 
glassflow/etl/models
   __init__.py100100% 
   base.py10190%9
   config.py34682%30–35
   data_types.py810100% 
   metadata.py40100% 
   pipeline.py1381390%65, 71, 76, 110, 115, 118, 137, 146, 171, 197, 210, 213, 216
   registry.py24195%30
   resources.py731184%33, 42–47, 93, 97, 122, 124
   sink.py77790%36, 44, 46, 81, 83, 85, 87
   source.py140100% 
glassflow/etl/models/sources
   __init__.py130100% 
   kafka.py141596%90, 108, 111, 198, 241
   otlp.py110100% 
glassflow/etl/models/transforms
   __init__.py250100% 
   deduplication.py18194%16
   filter.py30100% 
   join.py51198%49
   stateless.py80100% 
TOTAL12055995% 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant