Skip to content

Commit 0efe330

Browse files
authored
Chore!: Use version instead of identifier in the seeds table (#2481)
1 parent c6ec1d7 commit 0efe330

File tree

4 files changed

+128
-69
lines changed

4 files changed

+128
-69
lines changed

sqlmesh/core/plan/builder.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -472,9 +472,6 @@ def _categorize_snapshots(
472472
indirectly_modified,
473473
)
474474

475-
# set to breaking if an indirect child has no directly modified parents
476-
# that need a decision. this can happen when a revert to a parent causes
477-
# an indirectly modified snapshot to be created because of a new parent
478475
if (
479476
not is_directly_modified
480477
and not snapshot.version
@@ -484,11 +481,18 @@ def _categorize_snapshots(
484481
for upstream in dag.upstream(s_id)
485482
)
486483
):
487-
snapshot.categorize_as(
488-
SnapshotChangeCategory.FORWARD_ONLY
489-
if self._is_forward_only_model(s_id)
490-
else SnapshotChangeCategory.INDIRECT_BREAKING
491-
)
484+
if self._context_diff.indirectly_modified(snapshot.name):
485+
# Set to breaking if an indirect child has no directly modified parents
486+
# that need a decision. this can happen when a revert to a parent causes
487+
# an indirectly modified snapshot to be created because of a new parent
488+
snapshot.categorize_as(
489+
SnapshotChangeCategory.FORWARD_ONLY
490+
if self._is_forward_only_model(s_id)
491+
else SnapshotChangeCategory.INDIRECT_BREAKING
492+
)
493+
else:
494+
# Metadata updated.
495+
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
492496

493497
elif s_id in self._context_diff.added and self._is_new_snapshot(snapshot):
494498
snapshot.categorize_as(

sqlmesh/core/state_sync/engine_adapter.py

Lines changed: 12 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def __init__(
144144

145145
self._seed_columns_to_types = {
146146
"name": exp.DataType.build("text"),
147-
"identifier": exp.DataType.build("text"),
147+
"version": exp.DataType.build("text"),
148148
"content": exp.DataType.build("text"),
149149
}
150150

@@ -220,7 +220,7 @@ def _push_snapshots(self, snapshots: t.Iterable[Snapshot], overwrite: bool = Fal
220220
seed_contents.append(
221221
{
222222
"name": snapshot.name,
223-
"identifier": snapshot.identifier,
223+
"version": snapshot.version,
224224
"content": seed_model.seed.content,
225225
}
226226
)
@@ -341,6 +341,10 @@ def _is_snapshot_used(snapshot: Snapshot) -> bool:
341341
)
342342
)
343343

344+
seed_deletion_candidates = [t.snapshot for t in cleanup_targets if not t.dev_table_only]
345+
if seed_deletion_candidates:
346+
self._delete_seeds(seed_deletion_candidates)
347+
344348
return cleanup_targets
345349

346350
def delete_expired_environments(self) -> t.List[Environment]:
@@ -368,7 +372,6 @@ def delete_expired_environments(self) -> t.List[Environment]:
368372
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
369373
for where in self._snapshot_id_filter(snapshot_ids):
370374
self.engine_adapter.delete_from(self.snapshots_table, where=where)
371-
self.engine_adapter.delete_from(self.seeds_table, where=where)
372375

373376
def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]:
374377
return self._snapshot_ids_exist(snapshot_ids, self.snapshots_table)
@@ -486,8 +489,8 @@ def _get_snapshots(
486489
exp.to_table(self.seeds_table).as_("seeds"),
487490
on=exp.and_(
488491
exp.column("name", table="snapshots").eq(exp.column("name", table="seeds")),
489-
exp.column("identifier", table="snapshots").eq(
490-
exp.column("identifier", table="seeds")
492+
exp.column("version", table="snapshots").eq(
493+
exp.column("version", table="seeds")
491494
),
492495
),
493496
join_type="left",
@@ -998,7 +1001,6 @@ def _migrate_rows(self, promoted_snapshots_only: bool) -> None:
9981001
if not snapshot_mapping:
9991002
logger.info("No changes to snapshots detected")
10001003
return
1001-
self._migrate_seed_rows(snapshot_mapping)
10021004
self._migrate_environment_rows(environments, snapshot_mapping)
10031005

10041006
def _migrate_snapshot_rows(
@@ -1147,51 +1149,6 @@ def _visit(
11471149

11481150
return all_snapshot_mapping
11491151

1150-
def _migrate_seed_rows(self, snapshot_mapping: t.Dict[SnapshotId, SnapshotTableInfo]) -> None:
1151-
# FIXME: This migration won't be necessary if the primary key of the seeds table is changed to
1152-
# (name, version) instead of (name, identifier).
1153-
seed_snapshot_ids = [
1154-
s_id for s_id, table_info in snapshot_mapping.items() if table_info.is_seed
1155-
]
1156-
if not seed_snapshot_ids:
1157-
logger.info("No seed rows to migrate")
1158-
return
1159-
1160-
logger.info("Migrating seed rows...")
1161-
1162-
for where in self._snapshot_id_filter(
1163-
seed_snapshot_ids, batch_size=self.SNAPSHOT_SEED_MIGRATION_BATCH_SIZE
1164-
):
1165-
seeds = {
1166-
SnapshotId(name=name, identifier=identifier): content
1167-
for name, identifier, content in self._fetchall(
1168-
exp.select("name", "identifier", "content").from_(self.seeds_table).where(where)
1169-
)
1170-
}
1171-
if not seeds:
1172-
continue
1173-
1174-
new_seeds = {}
1175-
for snapshot_id, content in seeds.items():
1176-
new_snapshot_id = snapshot_mapping[snapshot_id].snapshot_id
1177-
new_seeds[new_snapshot_id] = {
1178-
"name": new_snapshot_id.name,
1179-
"identifier": new_snapshot_id.identifier,
1180-
"content": content,
1181-
}
1182-
1183-
existing_snapshot_ids = self._snapshot_ids_exist(new_seeds, self.seeds_table)
1184-
seeds_to_push = [
1185-
s for s_id, s in new_seeds.items() if s_id not in existing_snapshot_ids
1186-
]
1187-
1188-
if seeds_to_push:
1189-
self.engine_adapter.insert_append(
1190-
self.seeds_table,
1191-
pd.DataFrame(seeds_to_push),
1192-
columns_to_types=self._seed_columns_to_types,
1193-
)
1194-
11951152
def _migrate_environment_rows(
11961153
self,
11971154
environments: t.List[Environment],
@@ -1226,6 +1183,10 @@ def _migrate_environment_rows(
12261183
except Exception:
12271184
logger.warning("Failed to unpause migrated snapshots", exc_info=True)
12281185

1186+
def _delete_seeds(self, snapshots: t.Iterable[SnapshotNameVersionLike]) -> None:
1187+
for where in self._snapshot_name_version_filter(snapshots, alias=None):
1188+
self.engine_adapter.delete_from(self.seeds_table, where=where)
1189+
12291190
def _snapshot_ids_exist(
12301191
self, snapshot_ids: t.Iterable[SnapshotIdLike], table_name: exp.Table
12311192
) -> t.Set[SnapshotId]:
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
"""Use version instead of identifier in the seeds table."""
2+
3+
from sqlglot import exp
4+
5+
from sqlmesh.utils.migration import index_text_type
6+
7+
8+
def migrate(state_sync, **kwargs): # type: ignore
9+
engine_adapter = state_sync.engine_adapter
10+
11+
snapshots_table = "_snapshots"
12+
seeds_table = "_seeds"
13+
new_seeds_table = f"{seeds_table}_v49"
14+
15+
if state_sync.schema:
16+
snapshots_table = f"{state_sync.schema}.{snapshots_table}"
17+
seeds_table = f"{state_sync.schema}.{seeds_table}"
18+
new_seeds_table = f"{state_sync.schema}.{new_seeds_table}"
19+
20+
index_type = index_text_type(engine_adapter.dialect)
21+
22+
engine_adapter.drop_table(new_seeds_table)
23+
engine_adapter.create_state_table(
24+
new_seeds_table,
25+
{
26+
"name": exp.DataType.build(index_type),
27+
"version": exp.DataType.build(index_type),
28+
"content": exp.DataType.build("text"),
29+
},
30+
primary_key=("name", "version"),
31+
)
32+
33+
name_col = exp.column("name", table="seeds")
34+
version_col = exp.column("version", table="snapshots")
35+
query = (
36+
exp.select(
37+
name_col,
38+
version_col,
39+
exp.func("MAX", exp.column("content", table="seeds")).as_("content"),
40+
)
41+
.from_(exp.to_table(seeds_table).as_("seeds"))
42+
.join(
43+
exp.to_table(snapshots_table).as_("snapshots"),
44+
on=exp.and_(
45+
exp.column("name", table="seeds").eq(exp.column("name", table="snapshots")),
46+
exp.column("identifier", table="seeds").eq(
47+
exp.column("identifier", table="snapshots")
48+
),
49+
),
50+
)
51+
.where(exp.column("version", table="snapshots").is_(exp.null()).not_())
52+
.group_by(name_col, version_col)
53+
)
54+
55+
engine_adapter.insert_append(new_seeds_table, query)
56+
engine_adapter.drop_table(seeds_table)
57+
engine_adapter.rename_table(new_seeds_table, "_seeds")

tests/core/test_state_sync.py

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,6 +1044,40 @@ def test_delete_expired_snapshots(state_sync: EngineAdapterStateSync, make_snaps
10441044
assert not state_sync.get_snapshots(None)
10451045

10461046

1047+
def test_delete_expired_snapshots_seed(
1048+
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
1049+
):
1050+
now_ts = now_timestamp()
1051+
1052+
snapshot = make_snapshot(
1053+
SeedModel(
1054+
name="a",
1055+
kind=SeedKind(path="./path/to/seed"),
1056+
seed=Seed(content="header\n1\n2"),
1057+
column_hashes={"header": "hash"},
1058+
depends_on=set(),
1059+
),
1060+
)
1061+
snapshot.ttl = "in 10 seconds"
1062+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
1063+
snapshot.updated_ts = now_ts - 15000
1064+
1065+
state_sync.push_snapshots([snapshot])
1066+
assert set(state_sync.get_snapshots(None)) == {snapshot.snapshot_id}
1067+
assert state_sync.engine_adapter.fetchall(
1068+
"SELECT name, version, content FROM sqlmesh._seeds"
1069+
) == [
1070+
(snapshot.name, snapshot.version, snapshot.model.seed.content),
1071+
]
1072+
1073+
assert state_sync.delete_expired_snapshots() == [
1074+
SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=False),
1075+
]
1076+
1077+
assert not state_sync.get_snapshots(None)
1078+
assert not state_sync.engine_adapter.fetchall("SELECT * FROM sqlmesh._seeds")
1079+
1080+
10471081
def test_delete_expired_snapshots_batching(
10481082
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
10491083
):
@@ -1581,6 +1615,8 @@ def test_migrate_rows(state_sync: EngineAdapterStateSync, mocker: MockerFixture)
15811615
},
15821616
)
15831617

1618+
state_sync.engine_adapter.drop_table("sqlmesh._seeds")
1619+
15841620
old_snapshots = state_sync.engine_adapter.fetchdf("select * from sqlmesh._snapshots")
15851621
old_environments = state_sync.engine_adapter.fetchdf("select * from sqlmesh._environments")
15861622

@@ -2124,28 +2160,29 @@ def test_snapshot_batching(state_sync, mocker, make_snapshot):
21242160
state_sync.SNAPSHOT_BATCH_SIZE = 2
21252161
state_sync.engine_adapter = mock
21262162

2163+
snapshot_a = make_snapshot(SqlModel(name="a", query=parse_one("select 1")), "1")
2164+
snapshot_b = make_snapshot(SqlModel(name="a", query=parse_one("select 2")), "2")
2165+
snapshot_c = make_snapshot(SqlModel(name="a", query=parse_one("select 3")), "3")
2166+
21272167
state_sync.delete_snapshots(
21282168
(
2129-
SnapshotId(name="a", identifier="1"),
2130-
SnapshotId(name="a", identifier="2"),
2131-
SnapshotId(name="a", identifier="3"),
2169+
snapshot_a,
2170+
snapshot_b,
2171+
snapshot_c,
21322172
)
21332173
)
21342174
calls = mock.delete_from.call_args_list
21352175
assert mock.delete_from.call_args_list == [
21362176
call(
21372177
exp.to_table("sqlmesh._snapshots"),
2138-
where=parse_one("(name, identifier) in (('a', '1'), ('a', '2'))"),
2139-
),
2140-
call(
2141-
exp.to_table("sqlmesh._seeds"),
2142-
where=parse_one("(name, identifier) in (('a', '1'), ('a', '2'))"),
2178+
where=parse_one(
2179+
f"(name, identifier) in (('\"a\"', '{snapshot_b.identifier}'), ('\"a\"', '{snapshot_a.identifier}'))"
2180+
),
21432181
),
21442182
call(
21452183
exp.to_table("sqlmesh._snapshots"),
2146-
where=parse_one("(name, identifier) in (('a', '3'))"),
2184+
where=parse_one(f"(name, identifier) in (('\"a\"', '{snapshot_c.identifier}'))"),
21472185
),
2148-
call(exp.to_table("sqlmesh._seeds"), where=parse_one("(name, identifier) in (('a', '3'))")),
21492186
]
21502187

21512188
mock.fetchall.side_effect = [

0 commit comments

Comments
 (0)