Skip to content

Commit 36c7efe

Browse files
committed
Establish session before snapshot promotion/demotion
1 parent 5b9e2f9 commit 36c7efe

File tree

3 files changed

+33
-18
lines changed

3 files changed

+33
-18
lines changed

sqlmesh/core/plan/evaluator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ def visit_virtual_layer_update_stage(
334334
)
335335
if stage.demoted_environment_naming_info:
336336
self._demote_snapshots(
337-
stage.demoted_snapshots,
337+
[stage.all_snapshots[s.snapshot_id] for s in stage.demoted_snapshots],
338338
stage.demoted_environment_naming_info,
339339
on_complete=lambda s: self.console.update_promotion_progress(s, False),
340340
)
@@ -376,7 +376,7 @@ def _promote_snapshots(
376376

377377
def _demote_snapshots(
378378
self,
379-
target_snapshots: t.Iterable[SnapshotTableInfo],
379+
target_snapshots: t.Iterable[Snapshot],
380380
environment_naming_info: EnvironmentNamingInfo,
381381
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
382382
) -> None:

sqlmesh/core/snapshot/evaluator.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ def promote(
275275

276276
def demote(
277277
self,
278-
target_snapshots: t.Iterable[SnapshotInfoLike],
278+
target_snapshots: t.Iterable[Snapshot],
279279
environment_naming_info: EnvironmentNamingInfo,
280280
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
281281
) -> None:
@@ -988,25 +988,30 @@ def _promote_snapshot(
988988
table_mapping=table_mapping,
989989
runtime_stage=RuntimeStage.PROMOTING,
990990
)
991-
_evaluation_strategy(snapshot, adapter).promote(
992-
table_name=table_name,
993-
view_name=view_name,
994-
model=snapshot.model,
995-
environment=environment_naming_info.name,
996-
snapshots=snapshots,
997-
**render_kwargs,
998-
)
999991

1000-
snapshot_by_name = {s.name: s for s in (snapshots or {}).values()}
1001-
render_kwargs["snapshots"] = snapshot_by_name
1002-
adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs))
992+
with (
993+
adapter.transaction(),
994+
adapter.session(snapshot.model.render_session_properties(**render_kwargs)),
995+
):
996+
_evaluation_strategy(snapshot, adapter).promote(
997+
table_name=table_name,
998+
view_name=view_name,
999+
model=snapshot.model,
1000+
environment=environment_naming_info.name,
1001+
snapshots=snapshots,
1002+
**render_kwargs,
1003+
)
1004+
1005+
snapshot_by_name = {s.name: s for s in (snapshots or {}).values()}
1006+
render_kwargs["snapshots"] = snapshot_by_name
1007+
adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs))
10031008

10041009
if on_complete is not None:
10051010
on_complete(snapshot)
10061011

10071012
def _demote_snapshot(
10081013
self,
1009-
snapshot: SnapshotInfoLike,
1014+
snapshot: Snapshot,
10101015
environment_naming_info: EnvironmentNamingInfo,
10111016
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]],
10121017
) -> None:
@@ -1018,7 +1023,18 @@ def _demote_snapshot(
10181023
view_name = snapshot.qualified_view_name.for_environment(
10191024
environment_naming_info, dialect=adapter.dialect
10201025
)
1021-
_evaluation_strategy(snapshot, adapter).demote(view_name)
1026+
session_properties = (
1027+
snapshot.model.render_session_properties(
1028+
engine_adapter=adapter, runtime_stage=RuntimeStage.PROMOTING
1029+
)
1030+
if snapshot.is_model
1031+
else {}
1032+
)
1033+
with (
1034+
adapter.transaction(),
1035+
adapter.session(session_properties),
1036+
):
1037+
_evaluation_strategy(snapshot, adapter).demote(view_name)
10221038

10231039
if on_complete is not None:
10241040
on_complete(snapshot)

sqlmesh/engines/commands.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
SnapshotEvaluator,
1010
SnapshotId,
1111
SnapshotTableCleanupTask,
12-
SnapshotTableInfo,
1312
)
1413
from sqlmesh.core.state_sync import cleanup_expired_views
1514
from sqlmesh.utils.date import TimeLike
@@ -49,7 +48,7 @@ class PromoteCommandPayload(PydanticModel):
4948

5049

5150
class DemoteCommandPayload(PydanticModel):
52-
snapshots: t.List[SnapshotTableInfo]
51+
snapshots: t.List[Snapshot]
5352
environment_naming_info: EnvironmentNamingInfo
5453

5554

0 commit comments

Comments
 (0)