Skip to content

Commit a0e55de

Browse files
authored
Feat: Support restatements for the incremental unmanaged model kind (#2423)
1 parent c47bf45 commit a0e55de

File tree

12 files changed

+524
-59
lines changed

12 files changed

+524
-59
lines changed

docs/concepts/models/model_kinds.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ WHERE
166166
event_date BETWEEN @start_date AND @end_date;
167167
```
168168

169-
**Note:** Models of the `INCREMENTAL_BY_UNIQUE_KEY` kind are inherently [non-idempotent](../glossary.md#idempotency), which should be taken into consideration during data [restatement](../plans.md#restatement-plans).
169+
**Note:** Models of the `INCREMENTAL_BY_UNIQUE_KEY` kind are inherently [non-idempotent](../glossary.md#idempotency), which should be taken into consideration during data [restatement](../plans.md#restatement-plans). As a result, partial data restatement is not supported for this model kind, which means that the entire table will be recreated from scratch if restated.
170170

171171
### Unique Key Expressions
172172

@@ -320,7 +320,9 @@ SQLMesh achieves this by adding a `valid_from` and `valid_to` column to your mod
320320

321321
Therefore you can use these models to not only tell you what the latest value is for a given record but also what the values were anytime in the past. Note that maintaining this history does come at a cost of increased storage and compute and this may not be a good fit for sources that change frequently since the history could get very large.
322322

323-
There are two ways to tracking changes: By Time (Recommended) or By Column.
323+
**Note**: Partial data [restatement](../plans.md#restatement-plans) is not supported for this model kind, which means that the entire table will be recreated from scratch if restated. This may lead to data loss, which is why data restatement is disabled for models of this kind by default.
324+
325+
There are two ways to tracking changes: By Time (Recommended) or By Column.
324326

325327
### SCD Type 2 By Time (Recommended)
326328

docs/concepts/plans.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ For this reason, the `plan` command supports the `--restate-model`, which allows
133133

134134
Application of a plan will trigger a cascading backfill for all specified models (other than external tables), as well as all models downstream from them. The plan's date range determines the data intervals that will be affected.
135135

136+
Please note that models of kinds [INCREMENTAL_BY_UNIQUE_KEY](models/model_kinds.md#INCREMENTAL_BY_UNIQUE_KEY), [SCD_TYPE_2_BY_TIME](models/model_kinds.md#scd-type-2), and [SCD_TYPE_2_BY_COLUMN](models/model_kinds.md#scd-type-2) cannot be partially restated. Therefore, such models will be fully refreshed regardless of the start/end dates provided by a user in the plan.
137+
138+
To prevent models from ever being restated, set the [disable_restatement](models/overview.md#disable_restatement) attribute to `true`.
139+
136140
See examples below for how to restate both based on model names and model tags.
137141

138142

sqlmesh/core/engine_adapter/base.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,6 +1159,7 @@ def scd_type_2_by_time(
11591159
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
11601160
table_description: t.Optional[str] = None,
11611161
column_descriptions: t.Optional[t.Dict[str, str]] = None,
1162+
truncate: bool = False,
11621163
**kwargs: t.Any,
11631164
) -> None:
11641165
self._scd_type_2(
@@ -1174,6 +1175,7 @@ def scd_type_2_by_time(
11741175
columns_to_types=columns_to_types,
11751176
table_description=table_description,
11761177
column_descriptions=column_descriptions,
1178+
truncate=truncate,
11771179
)
11781180

11791181
def scd_type_2_by_column(
@@ -1190,6 +1192,7 @@ def scd_type_2_by_column(
11901192
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
11911193
table_description: t.Optional[str] = None,
11921194
column_descriptions: t.Optional[t.Dict[str, str]] = None,
1195+
truncate: bool = False,
11931196
**kwargs: t.Any,
11941197
) -> None:
11951198
self._scd_type_2(
@@ -1205,6 +1208,7 @@ def scd_type_2_by_column(
12051208
execution_time_as_valid_from=execution_time_as_valid_from,
12061209
table_description=table_description,
12071210
column_descriptions=column_descriptions,
1211+
truncate=truncate,
12081212
)
12091213

12101214
def _scd_type_2(
@@ -1223,6 +1227,7 @@ def _scd_type_2(
12231227
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
12241228
table_description: t.Optional[str] = None,
12251229
column_descriptions: t.Optional[t.Dict[str, str]] = None,
1230+
truncate: bool = False,
12261231
) -> None:
12271232
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
12281233
source_table, columns_to_types, target_table=target_table, batch_size=0
@@ -1366,6 +1371,11 @@ def _scd_type_2(
13661371
.when(exp.column(f"t_{valid_from_name}").is_(exp.Null()), update_valid_from_start)
13671372
.else_(exp.column(f"t_{valid_from_name}"))
13681373
).as_(valid_from_name)
1374+
1375+
existing_rows_query = exp.select(*table_columns).from_(target_table)
1376+
if truncate:
1377+
existing_rows_query = existing_rows_query.limit(0)
1378+
13691379
with source_queries[0] as source_query:
13701380
query = (
13711381
exp.Select() # type: ignore
@@ -1378,16 +1388,12 @@ def _scd_type_2(
13781388
# Historical Records that Do Not Change
13791389
.with_(
13801390
"static",
1381-
exp.select(*table_columns)
1382-
.from_(target_table)
1383-
.where(f"{valid_to_name} IS NOT NULL"),
1391+
existing_rows_query.where(f"{valid_to_name} IS NOT NULL"),
13841392
)
13851393
# Latest Records that can be updated
13861394
.with_(
13871395
"latest",
1388-
exp.select(*table_columns)
1389-
.from_(target_table)
1390-
.where(f"{valid_to_name} IS NULL"),
1396+
existing_rows_query.where(f"{valid_to_name} IS NULL"),
13911397
)
13921398
# Deleted records which can be used to determine `valid_from` for undeleted source records
13931399
.with_(

sqlmesh/core/engine_adapter/trino.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ def _scd_type_2(
216216
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
217217
table_description: t.Optional[str] = None,
218218
column_descriptions: t.Optional[t.Dict[str, str]] = None,
219+
truncate: bool = False,
219220
) -> None:
220221
if columns_to_types and self.current_catalog_type == "delta_lake":
221222
columns_to_types = self._to_delta_ts(columns_to_types)
@@ -235,6 +236,7 @@ def _scd_type_2(
235236
columns_to_types,
236237
table_description,
237238
column_descriptions,
239+
truncate,
238240
)
239241

240242
# delta_lake only supports two timestamp data types. This method converts other

sqlmesh/core/model/kind.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,13 @@ def only_execution_time(self) -> bool:
115115
"""Whether or not this model only cares about execution time to render."""
116116
return self.is_view or self.is_full or self.is_scd_type_2
117117

118+
@property
119+
def full_history_restatement_only(self) -> bool:
120+
"""Whether or not this model only supports restatement of full history."""
121+
return (
122+
self.is_incremental_unmanaged or self.is_incremental_by_unique_key or self.is_scd_type_2
123+
)
124+
118125

119126
class ModelKindName(str, ModelKindMixin, Enum):
120127
"""The kind of model, determining how this data is computed and stored in the warehouse."""
@@ -342,7 +349,7 @@ class IncrementalUnmanagedKind(_ModelKind):
342349
name: Literal[ModelKindName.INCREMENTAL_UNMANAGED] = ModelKindName.INCREMENTAL_UNMANAGED
343350
insert_overwrite: SQLGlotBool = False
344351
forward_only: SQLGlotBool = True
345-
disable_restatement: Literal[True] = True
352+
disable_restatement: SQLGlotBool = True
346353

347354
@property
348355
def data_hash_values(self) -> t.List[t.Optional[str]]:

sqlmesh/core/snapshot/definition.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,13 @@ def get_removal_interval(
675675
*,
676676
strict: bool = True,
677677
) -> Interval:
678-
end = execution_time or now() if self.depends_on_past else end
678+
end = (
679+
execution_time or now()
680+
if self.depends_on_past or self.full_history_restatement_only
681+
else end
682+
)
683+
if self.full_history_restatement_only and self.intervals:
684+
start = self.intervals[0][0]
679685
return self.inclusive_exclusive(start, end, strict)
680686

681687
def inclusive_exclusive(

sqlmesh/core/snapshot/evaluator.py

Lines changed: 60 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
)
5050
from sqlmesh.core.snapshot import (
5151
DeployabilityIndex,
52+
Intervals,
5253
QualifiedViewName,
5354
Snapshot,
5455
SnapshotChangeCategory,
@@ -845,7 +846,7 @@ def insert(
845846
name: str,
846847
query_or_df: QueryOrDF,
847848
snapshots: t.Dict[str, Snapshot],
848-
deployability_index: t.Optional[DeployabilityIndex],
849+
deployability_index: DeployabilityIndex,
849850
**kwargs: t.Any,
850851
) -> None:
851852
"""Inserts the given query or a DataFrame into the target table or replaces a view.
@@ -865,7 +866,7 @@ def append(
865866
table_name: str,
866867
query_or_df: QueryOrDF,
867868
snapshots: t.Dict[str, Snapshot],
868-
deployability_index: t.Optional[DeployabilityIndex],
869+
deployability_index: DeployabilityIndex,
869870
**kwargs: t.Any,
870871
) -> None:
871872
"""Appends the given query or a DataFrame to the existing table.
@@ -952,6 +953,27 @@ def demote(
952953
environment_naming_info: Naming information for the target environment.
953954
"""
954955

956+
def _replace_query_for_model(self, model: Model, name: str, query_or_df: QueryOrDF) -> None:
957+
"""Replaces the table for the given model.
958+
959+
Args:
960+
model: The target model.
961+
name: The name of the target table.
962+
query_or_df: The query or DataFrame to replace the target table with.
963+
"""
964+
self.adapter.replace_query(
965+
name,
966+
query_or_df,
967+
columns_to_types=model.columns_to_types if model.annotated else None,
968+
storage_format=model.storage_format,
969+
partitioned_by=model.partitioned_by,
970+
partition_interval_unit=model.interval_unit,
971+
clustered_by=model.clustered_by,
972+
table_properties=model.table_properties,
973+
table_description=model.description,
974+
column_descriptions=model.column_descriptions,
975+
)
976+
955977

956978
class SymbolicStrategy(EvaluationStrategy):
957979
def insert(
@@ -960,7 +982,7 @@ def insert(
960982
name: str,
961983
query_or_df: QueryOrDF,
962984
snapshots: t.Dict[str, Snapshot],
963-
deployability_index: t.Optional[DeployabilityIndex],
985+
deployability_index: DeployabilityIndex,
964986
**kwargs: t.Any,
965987
) -> None:
966988
pass
@@ -971,7 +993,7 @@ def append(
971993
table_name: str,
972994
query_or_df: QueryOrDF,
973995
snapshots: t.Dict[str, Snapshot],
974-
deployability_index: t.Optional[DeployabilityIndex],
996+
deployability_index: DeployabilityIndex,
975997
**kwargs: t.Any,
976998
) -> None:
977999
pass
@@ -1063,7 +1085,7 @@ def append(
10631085
table_name: str,
10641086
query_or_df: QueryOrDF,
10651087
snapshots: t.Dict[str, Snapshot],
1066-
deployability_index: t.Optional[DeployabilityIndex],
1088+
deployability_index: DeployabilityIndex,
10671089
**kwargs: t.Any,
10681090
) -> None:
10691091
model = snapshot.model
@@ -1138,7 +1160,7 @@ def insert(
11381160
name: str,
11391161
query_or_df: QueryOrDF,
11401162
snapshots: t.Dict[str, Snapshot],
1141-
deployability_index: t.Optional[DeployabilityIndex],
1163+
deployability_index: DeployabilityIndex,
11421164
**kwargs: t.Any,
11431165
) -> None:
11441166
model = snapshot.model
@@ -1160,17 +1182,20 @@ def insert(
11601182
name: str,
11611183
query_or_df: QueryOrDF,
11621184
snapshots: t.Dict[str, Snapshot],
1163-
deployability_index: t.Optional[DeployabilityIndex],
1185+
deployability_index: DeployabilityIndex,
11641186
**kwargs: t.Any,
11651187
) -> None:
11661188
model = snapshot.model
1167-
self.adapter.merge(
1168-
name,
1169-
query_or_df,
1170-
columns_to_types=model.columns_to_types,
1171-
unique_key=model.unique_key,
1172-
when_matched=model.when_matched,
1173-
)
1189+
if not _intervals(snapshot, deployability_index):
1190+
self._replace_query_for_model(model, name, query_or_df)
1191+
else:
1192+
self.adapter.merge(
1193+
name,
1194+
query_or_df,
1195+
columns_to_types=model.columns_to_types,
1196+
unique_key=model.unique_key,
1197+
when_matched=model.when_matched,
1198+
)
11741199

11751200
def append(
11761201
self,
@@ -1198,11 +1223,13 @@ def insert(
11981223
name: str,
11991224
query_or_df: QueryOrDF,
12001225
snapshots: t.Dict[str, Snapshot],
1201-
deployability_index: t.Optional[DeployabilityIndex],
1226+
deployability_index: DeployabilityIndex,
12021227
**kwargs: t.Any,
12031228
) -> None:
12041229
model = snapshot.model
1205-
if isinstance(model.kind, IncrementalUnmanagedKind) and model.kind.insert_overwrite:
1230+
if not _intervals(snapshot, deployability_index):
1231+
self._replace_query_for_model(model, name, query_or_df)
1232+
elif isinstance(model.kind, IncrementalUnmanagedKind) and model.kind.insert_overwrite:
12061233
self.adapter.insert_overwrite_by_partition(
12071234
name,
12081235
query_or_df,
@@ -1227,22 +1254,11 @@ def insert(
12271254
name: str,
12281255
query_or_df: QueryOrDF,
12291256
snapshots: t.Dict[str, Snapshot],
1230-
deployability_index: t.Optional[DeployabilityIndex],
1257+
deployability_index: DeployabilityIndex,
12311258
**kwargs: t.Any,
12321259
) -> None:
12331260
model = snapshot.model
1234-
self.adapter.replace_query(
1235-
name,
1236-
query_or_df,
1237-
columns_to_types=model.columns_to_types if model.annotated else None,
1238-
storage_format=model.storage_format,
1239-
partitioned_by=model.partitioned_by,
1240-
partition_interval_unit=model.interval_unit,
1241-
clustered_by=model.clustered_by,
1242-
table_properties=model.table_properties,
1243-
table_description=model.description,
1244-
column_descriptions=model.column_descriptions,
1245-
)
1261+
self._replace_query_for_model(model, name, query_or_df)
12461262

12471263

12481264
class SCDType2Strategy(MaterializableStrategy):
@@ -1290,10 +1306,11 @@ def insert(
12901306
name: str,
12911307
query_or_df: QueryOrDF,
12921308
snapshots: t.Dict[str, Snapshot],
1293-
deployability_index: t.Optional[DeployabilityIndex],
1309+
deployability_index: DeployabilityIndex,
12941310
**kwargs: t.Any,
12951311
) -> None:
12961312
model = snapshot.model
1313+
truncate = not _intervals(snapshot, deployability_index)
12971314
if isinstance(model.kind, SCDType2ByTimeKind):
12981315
self.adapter.scd_type_2_by_time(
12991316
target_table=name,
@@ -1307,6 +1324,7 @@ def insert(
13071324
columns_to_types=model.columns_to_types,
13081325
table_description=model.description,
13091326
column_descriptions=model.column_descriptions,
1327+
truncate=truncate,
13101328
**kwargs,
13111329
)
13121330
elif isinstance(model.kind, SCDType2ByColumnKind):
@@ -1322,6 +1340,7 @@ def insert(
13221340
execution_time_as_valid_from=model.kind.execution_time_as_valid_from,
13231341
table_description=model.description,
13241342
column_descriptions=model.column_descriptions,
1343+
truncate=truncate,
13251344
**kwargs,
13261345
)
13271346
else:
@@ -1335,7 +1354,7 @@ def append(
13351354
table_name: str,
13361355
query_or_df: QueryOrDF,
13371356
snapshots: t.Dict[str, Snapshot],
1338-
deployability_index: t.Optional[DeployabilityIndex],
1357+
deployability_index: DeployabilityIndex,
13391358
**kwargs: t.Any,
13401359
) -> None:
13411360
model = snapshot.model
@@ -1382,7 +1401,7 @@ def insert(
13821401
name: str,
13831402
query_or_df: QueryOrDF,
13841403
snapshots: t.Dict[str, Snapshot],
1385-
deployability_index: t.Optional[DeployabilityIndex],
1404+
deployability_index: DeployabilityIndex,
13861405
**kwargs: t.Any,
13871406
) -> None:
13881407
model = snapshot.model
@@ -1425,7 +1444,7 @@ def append(
14251444
table_name: str,
14261445
query_or_df: QueryOrDF,
14271446
snapshots: t.Dict[str, Snapshot],
1428-
deployability_index: t.Optional[DeployabilityIndex],
1447+
deployability_index: DeployabilityIndex,
14291448
**kwargs: t.Any,
14301449
) -> None:
14311450
raise ConfigError(f"Cannot append to a view '{table_name}'.")
@@ -1485,3 +1504,11 @@ def delete(self, name: str) -> None:
14851504

14861505
def _is_materialized_view(self, model: Model) -> bool:
14871506
return isinstance(model.kind, ViewKind) and model.kind.materialized
1507+
1508+
1509+
def _intervals(snapshot: Snapshot, deployability_index: DeployabilityIndex) -> Intervals:
1510+
return (
1511+
snapshot.intervals
1512+
if deployability_index.is_deployable(snapshot)
1513+
else snapshot.dev_intervals
1514+
)

0 commit comments

Comments
 (0)