Skip to content

Commit 7b9277e

Browse files
treyspgeorgesittas
andauthored
Fix(clickhouse): support non-datetime time column partitioning (#3357)
Co-authored-by: Jo <46752250+georgesittas@users.noreply.github.com>
1 parent 9e1b34b commit 7b9277e

File tree

3 files changed

+110
-27
lines changed

3 files changed

+110
-27
lines changed

sqlmesh/core/model/definition.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,20 @@ def full_depends_on(self) -> t.Set[str]:
982982

983983
return self._full_depends_on
984984

985+
@property
986+
def partitioned_by(self) -> t.List[exp.Expression]:
987+
"""Columns to partition the model by, including the time column if it is not already included."""
988+
if self.time_column and self.time_column.column not in {
989+
col for expr in self.partitioned_by_ for col in expr.find_all(exp.Column)
990+
}:
991+
return [
992+
TIME_COL_PARTITION_FUNC.get(self.dialect, lambda x, y: x)(
993+
self.time_column.column, self.columns_to_types
994+
),
995+
*self.partitioned_by_,
996+
]
997+
return self.partitioned_by_
998+
985999

9861000
class _SqlBasedModel(_Model):
9871001
inline_audits_: t.Dict[str, t.Any] = Field(default={}, alias="inline_audits")
@@ -2456,3 +2470,45 @@ def _meta_renderer(
24562470
def get_model_name(path: Path) -> str:
24572471
path_parts = list(path.parts[path.parts.index("models") + 1 : -1]) + [path.stem]
24582472
return ".".join(path_parts[-3:])
2473+
2474+
2475+
# function applied to time column when automatically used for partitioning in INCREMENTAL_BY_TIME_RANGE models
2476+
def clickhouse_partition_func(
2477+
column: exp.Expression, columns_to_types: t.Optional[t.Dict[str, exp.DataType]]
2478+
) -> exp.Expression:
2479+
# `toMonday()` function accepts a Date or DateTime type column
2480+
2481+
col_type = (columns_to_types and columns_to_types.get(column.name)) or exp.DataType.build(
2482+
"UNKNOWN"
2483+
)
2484+
col_type_is_conformable = col_type.is_type(
2485+
exp.DataType.Type.DATE,
2486+
exp.DataType.Type.DATE32,
2487+
exp.DataType.Type.DATETIME,
2488+
exp.DataType.Type.DATETIME64,
2489+
)
2490+
2491+
# if input column is already a conformable type, just pass the column
2492+
if col_type_is_conformable:
2493+
return exp.func("toMonday", column, dialect="clickhouse")
2494+
2495+
# if input column type is not known, cast input to DateTime64
2496+
if col_type.is_type(exp.DataType.Type.UNKNOWN):
2497+
return exp.func(
2498+
"toMonday",
2499+
exp.cast(column, exp.DataType.build("DateTime64('UTC')", dialect="clickhouse")),
2500+
dialect="clickhouse",
2501+
)
2502+
2503+
# if input column type is known but not conformable, cast input to DateTime64 and cast output back to original type
2504+
return exp.cast(
2505+
exp.func(
2506+
"toMonday",
2507+
exp.cast(column, exp.DataType.build("DateTime64('UTC')", dialect="clickhouse")),
2508+
dialect="clickhouse",
2509+
),
2510+
col_type,
2511+
)
2512+
2513+
2514+
TIME_COL_PARTITION_FUNC = {"clickhouse": clickhouse_partition_func}

sqlmesh/core/model/meta.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -367,18 +367,6 @@ def unique_key(self) -> t.List[exp.Expression]:
367367
return self.kind.unique_key
368368
return []
369369

370-
@property
371-
def partitioned_by(self) -> t.List[exp.Expression]:
372-
"""Columns to partition the model by, including the time column if it is not already included."""
373-
if self.time_column and self.time_column.column not in [
374-
col for col in self._partition_by_columns
375-
]:
376-
return [
377-
TIME_COL_PARTITION_FUNC.get(self.dialect, lambda x: x)(self.time_column.column),
378-
*self.partitioned_by_,
379-
]
380-
return self.partitioned_by_
381-
382370
@property
383371
def clustered_by(self) -> t.List[exp.Expression]:
384372
return self.clustered_by_ or []
@@ -446,10 +434,6 @@ def all_references(self) -> t.List[Reference]:
446434
Reference(model_name=self.name, expression=e, unique=True) for e in self.references
447435
]
448436

449-
@property
450-
def _partition_by_columns(self) -> t.List[exp.Column]:
451-
return [col for expr in self.partitioned_by_ for col in expr.find_all(exp.Column)]
452-
453437
@property
454438
def managed_columns(self) -> t.Dict[str, exp.DataType]:
455439
return getattr(self.kind, "managed_columns", {})
@@ -478,12 +462,3 @@ def fqn(self) -> str:
478462
@property
479463
def on_destructive_change(self) -> OnDestructiveChange:
480464
return getattr(self.kind, "on_destructive_change", OnDestructiveChange.ALLOW)
481-
482-
483-
# function applied to time column when automatically used for partitioning in
484-
# INCREMENTAL_BY_TIME_RANGE models
485-
TIME_COL_PARTITION_FUNC = {
486-
"clickhouse": lambda x: exp.func(
487-
"toMonday", exp.cast(x, exp.DataType.build("DateTime64", dialect="clickhouse"))
488-
)
489-
}

tests/core/engine_adapter/test_clickhouse.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,7 @@ def build_properties_sql(storage_format="", order_by="", primary_key="", propert
350350

351351

352352
def test_partitioned_by_expr(make_mocked_engine_adapter: t.Callable):
353+
# user doesn't specify, unknown time column type
353354
model = load_sql_based_model(
354355
parse(
355356
"""
@@ -369,8 +370,11 @@ def test_partitioned_by_expr(make_mocked_engine_adapter: t.Callable):
369370
)
370371
)
371372

372-
assert model.partitioned_by[0].sql("clickhouse") == 'toMonday(CAST("ds" AS DateTime64))'
373+
assert (
374+
model.partitioned_by[0].sql("clickhouse") == """toMonday(CAST("ds" AS DateTime64('UTC')))"""
375+
)
373376

377+
# user specifies without time column, unknown time column type
374378
model = load_sql_based_model(
375379
parse(
376380
"""
@@ -392,10 +396,58 @@ def test_partitioned_by_expr(make_mocked_engine_adapter: t.Callable):
392396
)
393397

394398
assert [p.sql("clickhouse") for p in model.partitioned_by] == [
395-
'toMonday(CAST("ds" AS DateTime64))',
399+
"""toMonday(CAST("ds" AS DateTime64('UTC')))""",
396400
'"x"',
397401
]
398402

403+
# user doesn't specify, conformable date/datetime time column type
404+
model = load_sql_based_model(
405+
parse(
406+
"""
407+
MODEL (
408+
name foo,
409+
dialect clickhouse,
410+
kind INCREMENTAL_BY_TIME_RANGE(
411+
time_column ds
412+
)
413+
);
414+
415+
select
416+
ds::DATE as ds
417+
from bar;
418+
""",
419+
default_dialect="clickhouse",
420+
)
421+
)
422+
423+
assert model.partitioned_by[0].sql("clickhouse") == 'toMonday("ds")'
424+
425+
# user doesn't specify, non-conformable time column type
426+
model = load_sql_based_model(
427+
parse(
428+
"""
429+
MODEL (
430+
name foo,
431+
dialect clickhouse,
432+
kind INCREMENTAL_BY_TIME_RANGE(
433+
time_column ds
434+
)
435+
);
436+
437+
select
438+
ds::String as ds
439+
from bar;
440+
""",
441+
default_dialect="clickhouse",
442+
)
443+
)
444+
445+
assert (
446+
model.partitioned_by[0].sql("clickhouse")
447+
== """CAST(toMonday(CAST("ds" AS DateTime64('UTC'))) AS String)"""
448+
)
449+
450+
# user specifies partitioned_by with time column
399451
model = load_sql_based_model(
400452
parse(
401453
"""

0 commit comments

Comments
 (0)