feat(ee): Enterprise SDK — client, DLQ reprocessing, and Avro/Protobuf formats#68
Open
PabloPardoGarcia wants to merge 20 commits into
Open
feat(ee): Enterprise SDK — client, DLQ reprocessing, and Avro/Protobuf formats#68PabloPardoGarcia wants to merge 20 commits into
PabloPardoGarcia wants to merge 20 commits into
Conversation
… model layer Introduce `glassflow.ee` as a superset of `glassflow.etl`: - `ee.Client` / `ee.Pipeline` subclass the OSS classes; `Client` returns the EE `Pipeline` via a `_pipeline_class` seam so editions propagate without re-implementing construction. Lay the OSS foundation for open-ended, edition-extensible config models: - Add source and format registries (`models/registry.py`) with dispatch by `type` string, replacing the static discriminated union on `PipelineConfig.sources`. New source types and formats register themselves at import with no change to OSS models. - Add a `format` field to `KafkaSource` (tagged object: `json` in OSS, `avro`/`protobuf` to come in EE) with a polymorphic validation hook. `SerializeAsAny` preserves subclass-only fields on dump. - Add `_config_class` / `_config_patch_class` seams on `Pipeline` and make `validate_config` a classmethod so editions can use an extended config. - Keep a `_dlq_class` seam wired (pointing at the OSS DLQ) so the Enterprise DLQ can drop in via a follow-up PR with no OSS churn. DLQ-specific Enterprise capabilities (reprocessing/index consumption) are intentionally deferred to a follow-up PR (ETL-1187). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Contributor
Test Coverage Report
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Implements the SDK side of DLQ message processing (ETL-1187) on the Enterprise DLQ: - `ee.DLQ.list(batch_size, cursor)` — non-destructive, paginated read. Messages now carry `message_id` (NATS seq), `source`, and `received_at`. - `reprocess(message_ids)` / `reprocess_all()` — move messages back into the pipeline input (POST /dlq/reprocess, mode=selected|all). - `discard(message_ids)` / `discard_all()` — permanently remove (POST /dlq/discard, mode=selected|all). Deprecate the inherited operations whose endpoints are changing under EE: - `consume()` warns and delegates to `list()` (the /dlq/consume endpoint is being removed in favour of non-destructive /dlq/list), so callers keep working through the transition. - `purge()` warns and points to `discard_all()`; the legacy endpoint still works. message_ids are validated client-side (non-empty, <=1000) to mirror the backend and fail fast. A 403 from an unlicensed backend maps to FeatureNotLicensedError (re-added), which subclasses ForbiddenError so existing 403 handling still catches it. `ee.Pipeline` wires `_dlq_class` to the EE DLQ so `client.get_pipeline(id).dlq` exposes these methods. Contract per the DLQ Message Processing design doc / glassflow-etl-ee API. The backend reprocess/discard handlers and the consume->list rename are still in flight; the SDK is written against the agreed contract and tests mock the HTTP layer. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Reprocess replays messages through the running pipeline, so the API returns 409 Conflict when the pipeline is not in the Running state. Map it to a clear, catchable error instead of a generic APIError: - Add ConflictError (409) to the base client error mapping. - Add PipelineNotRunningError(ConflictError); reprocess/reprocess_all raise it on 409. Discard has no Running-state constraint, so its 409s stay ConflictError. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add an Enterprise Edition section covering the drop-in `glassflow.ee` client and DLQ management (list / reprocess / discard), including the FeatureNotLicensedError and PipelineNotRunningError behaviors and the 1000-id limit. Add a Features bullet for discoverability. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Align the EE DLQ list endpoint with glassflow-etl-ee#35 (ETL-1200) and make
iterating messages ergonomic:
- list() gains a `component` filter (ingestor/join/sink/dedup/oltp-receiver)
and now returns the paginated envelope {messages, next_cursor, has_more}
instead of a bare list; batch_size max raised to the backend's 1000.
- Add list_iter(), a lazy generator that pages via the cursor and yields
individual messages, so callers do not manage the cursor by hand.
- consume() (deprecated) unwraps the new envelope to keep its legacy list shape.
Update tests and the README accordingly.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add DLQ_COMPONENTS and reject an unknown `component` in list() with a ValueError before the request, instead of relying only on the server's 422. Verified against the live staging EE API (glassflow-etl-ee#35): valid components accepted, unknown rejected, and reprocess on a stopped pipeline returns 409 -> PipelineNotRunningError as mapped. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ocessing feat(ee): add DLQ list / reprocess / discard to the Enterprise client
Contributor
Test Coverage Report
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
…TL-1197) Add `format` (json/avro/protobuf) and a unified `schema` to the Kafka source, matching the agreed backend contract. - KafkaSchema groups all schema config: `fields` (json), `avsc` (Avro record), `proto` + `message` (protobuf), and read-only `parsed_fields` (returned by the backend on GET, never sent back and not used in SDK logic). AvroSchema enforces the structure GlassFlow needs to map to ClickHouse: a `record` with a `name` and non-empty `fields`; extra Avro keys are preserved. Full Avro correctness is left to the backend. - Compatibility across editions: input accepts both a top-level `schema_fields` (Open Source, and Enterprise for POST compat) and the unified `schema` object (Enterprise). On output, json serializes to top-level `schema_fields` (works on both editions) and avro/protobuf to the `schema` object. A `schema_fields` read property keeps the dedup/join validators and existing callers working. - Replace the speculative tagged-format foundation (formats.py + the format registry); the source-type registry is unchanged. Surface backend validation detail: APIError now carries the structured `details`, and the create/edit path composes details.error (e.g. a proto/avro compilation error) into PipelineInvalidConfigurationError. avro/protobuf are Enterprise features enforced by the backend; the SDK models them on the shared OSS source so OSS clients can also round-trip such configs. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…for-avro-and-protobuf feat: add Avro and Protobuf format support to Kafka source (ETL-1197)
Contributor
Test Coverage Report
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Contributor
Test Coverage Report
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Add ee.Pipeline.get_streams(), wrapping GET /api/v1/pipeline/{id}/streams. It
returns the pipeline's NATS JetStream streams as [{stream_name, component}],
useful for diagnosing NATS-level issues. 404 maps to PipelineNotFoundError and
403 to FeatureNotLicensedError. Enterprise-only, so it lives on ee.Pipeline.
Verified live against staging pipelines.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…o-get-pipeline-stream-names feat(ee): add Pipeline.get_streams for NATS stream names (ETL-1188)
Contributor
Test Coverage Report
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Update the Kafka source schema to the latest backend contract: - The Avro `.avsc` and Protobuf `.proto` schemas are unified into a single inline-string `schema.file`; the protobuf message name is `schema.message_type` (was `message`). The structural AvroSchema model is removed (the avsc is now an opaque string validated by the backend). - json fields now serialize to `schema.fields` (the unified object); the legacy top-level `schema_fields` is still accepted on input and upgraded to `schema.fields`. This removes the previous split serializer. - `schema.parsed_fields` remains read-only: parsed on GET, exposed for inspection, never sent back on create/edit. Validation: avro requires `schema.file`; protobuf requires `schema.file` and `schema.message_type`; using either with a non-avro/protobuf format is rejected.
Contributor
Test Coverage Report
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Establishes the GlassFlow Enterprise SDK and the Enterprise features built on it. Implements ETL-1186, and accumulates the work merged on top from #69 (ETL-1187, DLQ) and #70 (ETL-1197, Avro/Protobuf).
Enterprise client scaffold (ETL-1186)
ee.Client(etl.Client)andee.Pipeline(etl.Pipeline)subclass the Open Source classes;Clientreturns the EEPipelinevia a_pipeline_classseam, so editions propagate without re-implementing construction.from glassflow.ee import Clientis the single opt-in (same package today; can graduate to a separate licensed wheel later with no import change).models/registry.py):PipelineConfig.sourcesdispatches bytypeso new source types register without redefining a discriminated union. OSS sources register at import._config_class/_config_patch_classseams onPipeline;validate_configis a classmethod.DLQ message processing (ETL-1187)
On
ee.DLQ:list(batch_size, cursor, component)— non-destructive, paginated; returns{messages, next_cursor, has_more}.componentfilters by data-plane stage (ingestor/join/sink/dedup/oltp-receiver, validated client-side).list_iter(...)— lazy generator that pages via the cursor and yields messages.reprocess(ids)/reprocess_all()anddiscard(ids)/discard_all()—mode=selected|all,message_ids≤ 1000 (validated client-side).consume()→list(),purge()→discard_all().403→FeatureNotLicensedError; reprocess on a non-Running pipeline →409→PipelineNotRunningError(newConflictError).Avro and Protobuf formats (ETL-1197)
format(json/avro/protobuf) and a unifiedschema(KafkaSchema):fields(json),file(the inline.avsc/.prototext, shared by avro and protobuf),message_type(protobuf), and read-onlyparsed_fields(returned on GET, never sent back, not used in SDK logic). The avsc/proto are opaque strings validated by the backend.schemaobject (fields/file[+message_type]). The legacy top-levelschema_fieldsis deprecated but still accepted on input and upgraded toschema.fields.Pipeline streams (ETL-1188)
ee.Pipeline.get_streams()wrapsGET /api/v1/pipeline/{id}/streamsand returns the pipeline's NATS JetStream streams as[{"stream_name": ..., "component": ...}](component = the data-plane stage:ingestor/join/sink/dedup/dlq). Useful for NATS-level diagnostics (unblocks ETL-1101).404→PipelineNotFoundError,403→FeatureNotLicensedError./streamsendpoint is already deployed).Error detail
APIErrorcarries the structureddetails, and the create/edit path composesdetails.error(e.g. an Avro/Protobuf schema compilation error) intoPipelineInvalidConfigurationErrorinstead of the generic message.TODO
get_streams(ETL-1188) verified live against staging.main:get_pipelineround-tripsschema.avsc/proto+parsed_fields.list/list_iterwith real messages,componentfilter, andreprocess/discard(+ the409 → PipelineNotRunningErrorpath) against populated DLQs.Testing
255 passed, ruff clean. Unit-tested across the client/pipeline wiring, DLQ operations and deprecations, all schema input forms and round-trips, and error-detail surfacing.🤖 Generated with Claude Code