From c95b6df4ecd701e3725375a6c69a9adaa37cfdd7 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Tue, 9 Sep 2025 13:46:31 +0200 Subject: [PATCH 1/3] add extra supported data types --- src/glassflow/etl/models/data_types.py | 29 ++++++++++++++++++++++++++ src/glassflow/etl/models/source.py | 8 ++----- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/src/glassflow/etl/models/data_types.py b/src/glassflow/etl/models/data_types.py index 15367c1..a915f95 100644 --- a/src/glassflow/etl/models/data_types.py +++ b/src/glassflow/etl/models/data_types.py @@ -3,10 +3,17 @@ class KafkaDataType(CaseInsensitiveStrEnum): STRING = "string" + INT = "int" INT8 = "int8" INT16 = "int16" INT32 = "int32" INT64 = "int64" + UINT = "uint" + UINT8 = "uint8" + UINT16 = "uint16" + UINT32 = "uint32" + UINT64 = "uint64" + FLOAT = "float" FLOAT32 = "float32" FLOAT64 = "float64" BOOL = "bool" @@ -19,6 +26,10 @@ class ClickhouseDataType(CaseInsensitiveStrEnum): INT16 = "Int16" INT32 = "Int32" INT64 = "Int64" + UINT8 = "UInt8" + UINT16 = "UInt16" + UINT32 = "UInt32" + UINT64 = "UInt64" FLOAT32 = "Float32" FLOAT64 = "Float64" STRING = "String" @@ -62,6 +73,12 @@ class ClickhouseDataType(CaseInsensitiveStrEnum): ClickhouseDataType.LC_FIXEDSTRING, ClickhouseDataType.LC_DATETIME, ], + KafkaDataType.INT: [ + ClickhouseDataType.INT8, + ClickhouseDataType.INT16, + ClickhouseDataType.INT32, + ClickhouseDataType.INT64, + ], KafkaDataType.INT8: [ClickhouseDataType.INT8, ClickhouseDataType.LC_INT8], KafkaDataType.INT16: [ClickhouseDataType.INT16, ClickhouseDataType.LC_INT16], KafkaDataType.INT32: [ClickhouseDataType.INT32, ClickhouseDataType.LC_INT32], @@ -72,6 +89,18 @@ class ClickhouseDataType(CaseInsensitiveStrEnum): ClickhouseDataType.LC_INT64, ClickhouseDataType.LC_DATETIME, ], + KafkaDataType.UINT: [ + ClickhouseDataType.UINT8, + ClickhouseDataType.UINT16, + ClickhouseDataType.UINT32, + ClickhouseDataType.UINT64, + ], + KafkaDataType.UINT8: [ClickhouseDataType.UINT8], + KafkaDataType.UINT16: [ClickhouseDataType.UINT16], + KafkaDataType.UINT32: [ClickhouseDataType.UINT32], + KafkaDataType.UINT64: [ClickhouseDataType.UINT64], + KafkaDataType.UINT8: [ClickhouseDataType.UINT8], + KafkaDataType.FLOAT: [ClickhouseDataType.FLOAT64], KafkaDataType.FLOAT32: [ClickhouseDataType.FLOAT32, ClickhouseDataType.LC_FLOAT32], KafkaDataType.FLOAT64: [ ClickhouseDataType.FLOAT64, diff --git a/src/glassflow/etl/models/source.py b/src/glassflow/etl/models/source.py index f2c8ff2..0750c01 100644 --- a/src/glassflow/etl/models/source.py +++ b/src/glassflow/etl/models/source.py @@ -66,13 +66,9 @@ def validate_deduplication_fields(cls, values): # Validate id_field_type is a valid type when enabled id_field_type = values.get("id_field_type") - if id_field_type not in [ - KafkaDataType.STRING, - KafkaDataType.INT32, - KafkaDataType.INT64, - ]: + if id_field_type not in [KafkaDataType.STRING, KafkaDataType.INT]: raise ValueError( - "id_field_type must be a string, int32, or int64 when " + "id_field_type must be a string or int when " "deduplication is enabled" ) From cc794e34ef99ccedc9ec409addce8879b435fc57 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 9 Sep 2025 11:47:36 +0000 Subject: [PATCH 2/3] chore: bump version to 3.0.1 --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 4a36342..cb2b00e 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.0.0 +3.0.1 From 9379aa2599ce63f02c5f3a038fbcd3bffec47fbf Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Tue, 9 Sep 2025 16:13:23 +0200 Subject: [PATCH 3/3] remove duplication key type check --- src/glassflow/etl/models/source.py | 7 ------- tests/test_models/test_topic.py | 20 -------------------- 2 files changed, 27 deletions(-) diff --git a/src/glassflow/etl/models/source.py b/src/glassflow/etl/models/source.py index 0750c01..a4ec7b5 100644 --- a/src/glassflow/etl/models/source.py +++ b/src/glassflow/etl/models/source.py @@ -117,13 +117,6 @@ def validate_deduplication_id_field( "the event schema" ) - # Check if the field type matches the deduplication ID field type - if field.type.value != v.id_field_type.value: - raise ValueError( - f"Deduplication ID field type '{v.id_field_type.value}' does not match " - f"schema field type '{field.type.value}' for field '{v.id_field}'" - ) - return v diff --git a/tests/test_models/test_topic.py b/tests/test_models/test_topic.py index b63af81..319dc12 100644 --- a/tests/test_models/test_topic.py +++ b/tests/test_models/test_topic.py @@ -53,26 +53,6 @@ def test_topic_config_deduplication_id_field_validation(self): ) assert "does not exist in the event schema" in str(exc_info.value) - # Test with mismatched field type - with pytest.raises(ValueError) as exc_info: - models.TopicConfig( - name="test-topic", - consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST, - schema=models.Schema( - type=models.SchemaType.JSON, - fields=[ - models.SchemaField(name="id", type=models.KafkaDataType.INT64), - ], - ), - deduplication=models.DeduplicationConfig( - enabled=True, - id_field="id", - id_field_type=models.KafkaDataType.STRING, - time_window="1h", - ), - ) - assert "does not match schema field type" in str(exc_info.value) - # Test with disabled deduplication (should not validate) config = models.TopicConfig( name="test-topic",