feat: add Avro and Protobuf format support to Kafka source (ETL-1197)#70
Merged
PabloPardoGarcia merged 1 commit intoJun 16, 2026
Conversation
…TL-1197) Add `format` (json/avro/protobuf) and a unified `schema` to the Kafka source, matching the agreed backend contract. - KafkaSchema groups all schema config: `fields` (json), `avsc` (Avro record), `proto` + `message` (protobuf), and read-only `parsed_fields` (returned by the backend on GET, never sent back and not used in SDK logic). AvroSchema enforces the structure GlassFlow needs to map to ClickHouse: a `record` with a `name` and non-empty `fields`; extra Avro keys are preserved. Full Avro correctness is left to the backend. - Compatibility across editions: input accepts both a top-level `schema_fields` (Open Source, and Enterprise for POST compat) and the unified `schema` object (Enterprise). On output, json serializes to top-level `schema_fields` (works on both editions) and avro/protobuf to the `schema` object. A `schema_fields` read property keeps the dedup/join validators and existing callers working. - Replace the speculative tagged-format foundation (formats.py + the format registry); the source-type registry is unchanged. Surface backend validation detail: APIError now carries the structured `details`, and the create/edit path composes details.error (e.g. a proto/avro compilation error) into PipelineInvalidConfigurationError. avro/protobuf are Enterprise features enforced by the backend; the SDK models them on the shared OSS source so OSS clients can also round-trip such configs. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Implements ETL-1197: adds
format(json/avro/protobuf) and a unifiedschemato the Kafka source, matching the agreed backend contract.Stacked on #68 (ETL-1186) — base is
pablo/etl-1186-add-ee-client-to-sdk, which already includes the merged DLQ reprocessing work (#69). The diff here is just the avro/protobuf model.Model
KafkaSchemagroups all schema config:fields(json),avsc(Avro record),proto+message(protobuf), and read-onlyparsed_fields(returned by the backend on GET).AvroSchemaenforces the structure GlassFlow needs to map to ClickHouse: arecordwith anameand non-emptyfields; extra Avro keys are preserved. Full Avro correctness is left to the backend.formats.py+ the format registry) from the EE-client branch; the source-type registry is unchanged.Edition compatibility
schema_fieldsorschema.fieldsschema_fields(works on OSS + EE)schema.avscschema.avscschema.proto+schema.messageschema.proto+schema.messageparsed_fieldsis parsed from GET responses and exposed assource.source_schema.parsed_fields, but is read-only: not used in any SDK logic and never sent back on create/edit. Aschema_fieldsread property keeps the dedup/join validators and existing callers working.Error detail
APIErrornow carries the structureddetails, and the create/edit path composesdetails.error(e.g. an Avro/Protobuf schema compilation error) intoPipelineInvalidConfigurationErrorinstead of the generic message.Validation
avro requires
schema.avsc; protobuf requiresschema.proto+schema.message;avsc/proto/messagewith a non-avro/protobuf format is rejected.Testing
255 passed, ruff clean. Unit-tested across all input forms, round-trips,parsed_fieldshandling, and the error-detail surfacing.Not yet live-tested: the unified schema contract isn't in the backend
mainyet, so end-to-end verification against staging is pending the backend release.🤖 Generated with Claude Code