Skip to content

Commit e1bc3c6

Browse files
committed
Revert "Chore!: Use version instead of identifier in the seeds table (#2481)"
This reverts commit 0efe330.
1 parent 0efe330 commit e1bc3c6

File tree

4 files changed

+69
-128
lines changed

4 files changed

+69
-128
lines changed

sqlmesh/core/plan/builder.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,9 @@ def _categorize_snapshots(
472472
indirectly_modified,
473473
)
474474

475+
# set to breaking if an indirect child has no directly modified parents
476+
# that need a decision. this can happen when a revert to a parent causes
477+
# an indirectly modified snapshot to be created because of a new parent
475478
if (
476479
not is_directly_modified
477480
and not snapshot.version
@@ -481,18 +484,11 @@ def _categorize_snapshots(
481484
for upstream in dag.upstream(s_id)
482485
)
483486
):
484-
if self._context_diff.indirectly_modified(snapshot.name):
485-
# Set to breaking if an indirect child has no directly modified parents
486-
# that need a decision. this can happen when a revert to a parent causes
487-
# an indirectly modified snapshot to be created because of a new parent
488-
snapshot.categorize_as(
489-
SnapshotChangeCategory.FORWARD_ONLY
490-
if self._is_forward_only_model(s_id)
491-
else SnapshotChangeCategory.INDIRECT_BREAKING
492-
)
493-
else:
494-
# Metadata updated.
495-
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
487+
snapshot.categorize_as(
488+
SnapshotChangeCategory.FORWARD_ONLY
489+
if self._is_forward_only_model(s_id)
490+
else SnapshotChangeCategory.INDIRECT_BREAKING
491+
)
496492

497493
elif s_id in self._context_diff.added and self._is_new_snapshot(snapshot):
498494
snapshot.categorize_as(

sqlmesh/core/state_sync/engine_adapter.py

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def __init__(
144144

145145
self._seed_columns_to_types = {
146146
"name": exp.DataType.build("text"),
147-
"version": exp.DataType.build("text"),
147+
"identifier": exp.DataType.build("text"),
148148
"content": exp.DataType.build("text"),
149149
}
150150

@@ -220,7 +220,7 @@ def _push_snapshots(self, snapshots: t.Iterable[Snapshot], overwrite: bool = Fal
220220
seed_contents.append(
221221
{
222222
"name": snapshot.name,
223-
"version": snapshot.version,
223+
"identifier": snapshot.identifier,
224224
"content": seed_model.seed.content,
225225
}
226226
)
@@ -341,10 +341,6 @@ def _is_snapshot_used(snapshot: Snapshot) -> bool:
341341
)
342342
)
343343

344-
seed_deletion_candidates = [t.snapshot for t in cleanup_targets if not t.dev_table_only]
345-
if seed_deletion_candidates:
346-
self._delete_seeds(seed_deletion_candidates)
347-
348344
return cleanup_targets
349345

350346
def delete_expired_environments(self) -> t.List[Environment]:
@@ -372,6 +368,7 @@ def delete_expired_environments(self) -> t.List[Environment]:
372368
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
373369
for where in self._snapshot_id_filter(snapshot_ids):
374370
self.engine_adapter.delete_from(self.snapshots_table, where=where)
371+
self.engine_adapter.delete_from(self.seeds_table, where=where)
375372

376373
def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]:
377374
return self._snapshot_ids_exist(snapshot_ids, self.snapshots_table)
@@ -489,8 +486,8 @@ def _get_snapshots(
489486
exp.to_table(self.seeds_table).as_("seeds"),
490487
on=exp.and_(
491488
exp.column("name", table="snapshots").eq(exp.column("name", table="seeds")),
492-
exp.column("version", table="snapshots").eq(
493-
exp.column("version", table="seeds")
489+
exp.column("identifier", table="snapshots").eq(
490+
exp.column("identifier", table="seeds")
494491
),
495492
),
496493
join_type="left",
@@ -1001,6 +998,7 @@ def _migrate_rows(self, promoted_snapshots_only: bool) -> None:
1001998
if not snapshot_mapping:
1002999
logger.info("No changes to snapshots detected")
10031000
return
1001+
self._migrate_seed_rows(snapshot_mapping)
10041002
self._migrate_environment_rows(environments, snapshot_mapping)
10051003

10061004
def _migrate_snapshot_rows(
@@ -1149,6 +1147,51 @@ def _visit(
11491147

11501148
return all_snapshot_mapping
11511149

1150+
def _migrate_seed_rows(self, snapshot_mapping: t.Dict[SnapshotId, SnapshotTableInfo]) -> None:
1151+
# FIXME: This migration won't be necessary if the primary key of the seeds table is changed to
1152+
# (name, version) instead of (name, identifier).
1153+
seed_snapshot_ids = [
1154+
s_id for s_id, table_info in snapshot_mapping.items() if table_info.is_seed
1155+
]
1156+
if not seed_snapshot_ids:
1157+
logger.info("No seed rows to migrate")
1158+
return
1159+
1160+
logger.info("Migrating seed rows...")
1161+
1162+
for where in self._snapshot_id_filter(
1163+
seed_snapshot_ids, batch_size=self.SNAPSHOT_SEED_MIGRATION_BATCH_SIZE
1164+
):
1165+
seeds = {
1166+
SnapshotId(name=name, identifier=identifier): content
1167+
for name, identifier, content in self._fetchall(
1168+
exp.select("name", "identifier", "content").from_(self.seeds_table).where(where)
1169+
)
1170+
}
1171+
if not seeds:
1172+
continue
1173+
1174+
new_seeds = {}
1175+
for snapshot_id, content in seeds.items():
1176+
new_snapshot_id = snapshot_mapping[snapshot_id].snapshot_id
1177+
new_seeds[new_snapshot_id] = {
1178+
"name": new_snapshot_id.name,
1179+
"identifier": new_snapshot_id.identifier,
1180+
"content": content,
1181+
}
1182+
1183+
existing_snapshot_ids = self._snapshot_ids_exist(new_seeds, self.seeds_table)
1184+
seeds_to_push = [
1185+
s for s_id, s in new_seeds.items() if s_id not in existing_snapshot_ids
1186+
]
1187+
1188+
if seeds_to_push:
1189+
self.engine_adapter.insert_append(
1190+
self.seeds_table,
1191+
pd.DataFrame(seeds_to_push),
1192+
columns_to_types=self._seed_columns_to_types,
1193+
)
1194+
11521195
def _migrate_environment_rows(
11531196
self,
11541197
environments: t.List[Environment],
@@ -1183,10 +1226,6 @@ def _migrate_environment_rows(
11831226
except Exception:
11841227
logger.warning("Failed to unpause migrated snapshots", exc_info=True)
11851228

1186-
def _delete_seeds(self, snapshots: t.Iterable[SnapshotNameVersionLike]) -> None:
1187-
for where in self._snapshot_name_version_filter(snapshots, alias=None):
1188-
self.engine_adapter.delete_from(self.seeds_table, where=where)
1189-
11901229
def _snapshot_ids_exist(
11911230
self, snapshot_ids: t.Iterable[SnapshotIdLike], table_name: exp.Table
11921231
) -> t.Set[SnapshotId]:

sqlmesh/migrations/v0049_replace_identifier_with_version_in_seeds_table.py

Lines changed: 0 additions & 57 deletions
This file was deleted.

tests/core/test_state_sync.py

Lines changed: 10 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,40 +1044,6 @@ def test_delete_expired_snapshots(state_sync: EngineAdapterStateSync, make_snaps
10441044
assert not state_sync.get_snapshots(None)
10451045

10461046

1047-
def test_delete_expired_snapshots_seed(
1048-
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
1049-
):
1050-
now_ts = now_timestamp()
1051-
1052-
snapshot = make_snapshot(
1053-
SeedModel(
1054-
name="a",
1055-
kind=SeedKind(path="./path/to/seed"),
1056-
seed=Seed(content="header\n1\n2"),
1057-
column_hashes={"header": "hash"},
1058-
depends_on=set(),
1059-
),
1060-
)
1061-
snapshot.ttl = "in 10 seconds"
1062-
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
1063-
snapshot.updated_ts = now_ts - 15000
1064-
1065-
state_sync.push_snapshots([snapshot])
1066-
assert set(state_sync.get_snapshots(None)) == {snapshot.snapshot_id}
1067-
assert state_sync.engine_adapter.fetchall(
1068-
"SELECT name, version, content FROM sqlmesh._seeds"
1069-
) == [
1070-
(snapshot.name, snapshot.version, snapshot.model.seed.content),
1071-
]
1072-
1073-
assert state_sync.delete_expired_snapshots() == [
1074-
SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=False),
1075-
]
1076-
1077-
assert not state_sync.get_snapshots(None)
1078-
assert not state_sync.engine_adapter.fetchall("SELECT * FROM sqlmesh._seeds")
1079-
1080-
10811047
def test_delete_expired_snapshots_batching(
10821048
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
10831049
):
@@ -1615,8 +1581,6 @@ def test_migrate_rows(state_sync: EngineAdapterStateSync, mocker: MockerFixture)
16151581
},
16161582
)
16171583

1618-
state_sync.engine_adapter.drop_table("sqlmesh._seeds")
1619-
16201584
old_snapshots = state_sync.engine_adapter.fetchdf("select * from sqlmesh._snapshots")
16211585
old_environments = state_sync.engine_adapter.fetchdf("select * from sqlmesh._environments")
16221586

@@ -2160,29 +2124,28 @@ def test_snapshot_batching(state_sync, mocker, make_snapshot):
21602124
state_sync.SNAPSHOT_BATCH_SIZE = 2
21612125
state_sync.engine_adapter = mock
21622126

2163-
snapshot_a = make_snapshot(SqlModel(name="a", query=parse_one("select 1")), "1")
2164-
snapshot_b = make_snapshot(SqlModel(name="a", query=parse_one("select 2")), "2")
2165-
snapshot_c = make_snapshot(SqlModel(name="a", query=parse_one("select 3")), "3")
2166-
21672127
state_sync.delete_snapshots(
21682128
(
2169-
snapshot_a,
2170-
snapshot_b,
2171-
snapshot_c,
2129+
SnapshotId(name="a", identifier="1"),
2130+
SnapshotId(name="a", identifier="2"),
2131+
SnapshotId(name="a", identifier="3"),
21722132
)
21732133
)
21742134
calls = mock.delete_from.call_args_list
21752135
assert mock.delete_from.call_args_list == [
21762136
call(
21772137
exp.to_table("sqlmesh._snapshots"),
2178-
where=parse_one(
2179-
f"(name, identifier) in (('\"a\"', '{snapshot_b.identifier}'), ('\"a\"', '{snapshot_a.identifier}'))"
2180-
),
2138+
where=parse_one("(name, identifier) in (('a', '1'), ('a', '2'))"),
2139+
),
2140+
call(
2141+
exp.to_table("sqlmesh._seeds"),
2142+
where=parse_one("(name, identifier) in (('a', '1'), ('a', '2'))"),
21812143
),
21822144
call(
21832145
exp.to_table("sqlmesh._snapshots"),
2184-
where=parse_one(f"(name, identifier) in (('\"a\"', '{snapshot_c.identifier}'))"),
2146+
where=parse_one("(name, identifier) in (('a', '3'))"),
21852147
),
2148+
call(exp.to_table("sqlmesh._seeds"), where=parse_one("(name, identifier) in (('a', '3'))")),
21862149
]
21872150

21882151
mock.fetchall.side_effect = [

0 commit comments

Comments
 (0)