Skip to content

Commit fe6e7cd

Browse files
authored
Fix: correctly extract catalog name for table properties expression builder (#2196)
* Extract and pass catalog name to partition exp builder * Add tests that pass a table to _build_create_table_exp * Ensure schema.this is a table expression
1 parent 5ade5f4 commit fe6e7cd

File tree

4 files changed

+42
-9
lines changed

4 files changed

+42
-9
lines changed

sqlmesh/core/engine_adapter/base.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -688,11 +688,17 @@ def _build_create_table_exp(
688688
**kwargs: t.Any,
689689
) -> exp.Create:
690690
exists = False if replace else exists
691+
catalog_name = None
691692
if not isinstance(table_name_or_schema, exp.Schema):
692693
table_name_or_schema = exp.to_table(table_name_or_schema)
694+
catalog_name = table_name_or_schema.catalog
695+
else:
696+
if isinstance(table_name_or_schema.this, exp.Table):
697+
catalog_name = table_name_or_schema.this.catalog
698+
693699
properties = (
694700
self._build_table_properties_exp(
695-
**kwargs, table=table_name_or_schema.this, columns_to_types=columns_to_types
701+
**kwargs, catalog_name=catalog_name, columns_to_types=columns_to_types
696702
)
697703
if kwargs
698704
else None
@@ -1864,7 +1870,7 @@ def _table_properties_to_expressions(
18641870

18651871
def _build_table_properties_exp(
18661872
self,
1867-
table: exp.Table,
1873+
catalog_name: t.Optional[str] = None,
18681874
storage_format: t.Optional[str] = None,
18691875
partitioned_by: t.Optional[t.List[exp.Expression]] = None,
18701876
partition_interval_unit: t.Optional[IntervalUnit] = None,

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ def _build_description_property_exp(self, description: str) -> exp.Property:
459459

460460
def _build_table_properties_exp(
461461
self,
462-
table: exp.Table,
462+
catalog_name: t.Optional[str] = None,
463463
storage_format: t.Optional[str] = None,
464464
partitioned_by: t.Optional[t.List[exp.Expression]] = None,
465465
partition_interval_unit: t.Optional[IntervalUnit] = None,

sqlmesh/core/engine_adapter/mixins.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def _insert_overwrite_by_condition(
128128
class HiveMetastoreTablePropertiesMixin(EngineAdapter):
129129
def _build_table_properties_exp(
130130
self,
131-
table: exp.Table,
131+
catalog_name: t.Optional[str] = None,
132132
storage_format: t.Optional[str] = None,
133133
partitioned_by: t.Optional[t.List[exp.Expression]] = None,
134134
partition_interval_unit: t.Optional[IntervalUnit] = None,
@@ -155,7 +155,7 @@ def _build_table_properties_exp(
155155

156156
if (
157157
self.dialect == "trino"
158-
and self.get_catalog_type(table.catalog or self.get_current_catalog()) == "iceberg"
158+
and self.get_catalog_type(catalog_name or self.get_current_catalog()) == "iceberg"
159159
):
160160
# On the Trino Iceberg catalog, the table property is called "partitioning" - not "partitioned_by"
161161
# In addition, partition column transform expressions like `day(col)` or `bucket(col, 5)` are allowed

tests/core/engine_adapter/test_trino.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pytest
44
from pytest_mock.plugin import MockerFixture
5-
from sqlglot import exp
5+
from sqlglot import exp, parse_one
66

77
from sqlmesh.core.engine_adapter import TrinoEngineAdapter
88
from tests.core.engine_adapter import to_sql_calls
@@ -73,8 +73,11 @@ def test_partitioned_by_hive(
7373

7474
adapter.create_table("test_table", columns_to_types, partitioned_by=[exp.to_column("colb")])
7575

76+
adapter.ctas("test_table", parse_one("select 1"), partitioned_by=[exp.to_column("colb")]) # type: ignore
77+
7678
assert to_sql_calls(adapter) == [
77-
"""CREATE TABLE IF NOT EXISTS "test_table" ("cola" INTEGER, "colb" VARCHAR) WITH (PARTITIONED_BY=ARRAY['colb'])"""
79+
"""CREATE TABLE IF NOT EXISTS "test_table" ("cola" INTEGER, "colb" VARCHAR) WITH (PARTITIONED_BY=ARRAY['colb'])""",
80+
"""CREATE TABLE IF NOT EXISTS "test_table" WITH (PARTITIONED_BY=ARRAY['colb']) AS SELECT 1""",
7881
]
7982

8083

@@ -96,8 +99,11 @@ def test_partitioned_by_iceberg(
9699

97100
adapter.create_table("test_table", columns_to_types, partitioned_by=[exp.to_column("colb")])
98101

102+
adapter.ctas("test_table", parse_one("select 1"), partitioned_by=[exp.to_column("colb")]) # type: ignore
103+
99104
assert to_sql_calls(adapter) == [
100-
"""CREATE TABLE IF NOT EXISTS "test_table" ("cola" INTEGER, "colb" VARCHAR) WITH (PARTITIONING=ARRAY['colb'])"""
105+
"""CREATE TABLE IF NOT EXISTS "test_table" ("cola" INTEGER, "colb" VARCHAR) WITH (PARTITIONING=ARRAY['colb'])""",
106+
"""CREATE TABLE IF NOT EXISTS "test_table" WITH (PARTITIONING=ARRAY['colb']) AS SELECT 1""",
101107
]
102108

103109

@@ -122,8 +128,15 @@ def test_partitioned_by_iceberg_transforms(
122128
partitioned_by=[exp.to_column("day(cola)"), exp.to_column("truncate(colb, 8)")],
123129
)
124130

131+
adapter.ctas(
132+
"test_table",
133+
parse_one("select 1"), # type: ignore
134+
partitioned_by=[exp.to_column("day(cola)"), exp.to_column("truncate(colb, 8)")],
135+
)
136+
125137
assert to_sql_calls(adapter) == [
126-
"""CREATE TABLE IF NOT EXISTS "test_table" ("cola" INTEGER, "colb" VARCHAR) WITH (PARTITIONING=ARRAY['day(cola)', 'truncate(colb, 8)'])"""
138+
"""CREATE TABLE IF NOT EXISTS "test_table" ("cola" INTEGER, "colb" VARCHAR) WITH (PARTITIONING=ARRAY['day(cola)', 'truncate(colb, 8)'])""",
139+
"""CREATE TABLE IF NOT EXISTS "test_table" WITH (PARTITIONING=ARRAY['day(cola)', 'truncate(colb, 8)']) AS SELECT 1""",
127140
]
128141

129142

@@ -141,13 +154,27 @@ def test_partitioned_by_with_multiple_catalogs_same_server(
141154
"datalake.test_schema.test_table", columns_to_types, partitioned_by=[exp.to_column("colb")]
142155
)
143156

157+
adapter.ctas(
158+
"datalake.test_schema.test_table",
159+
parse_one("select 1"), # type: ignore
160+
partitioned_by=[exp.to_column("colb")],
161+
)
162+
144163
adapter.create_table(
145164
"datalake_iceberg.test_schema.test_table",
146165
columns_to_types,
147166
partitioned_by=[exp.to_column("colb")],
148167
)
149168

169+
adapter.ctas(
170+
"datalake_iceberg.test_schema.test_table",
171+
parse_one("select 1"), # type: ignore
172+
partitioned_by=[exp.to_column("colb")],
173+
)
174+
150175
assert to_sql_calls(adapter) == [
151176
"""CREATE TABLE IF NOT EXISTS "datalake"."test_schema"."test_table" ("cola" INTEGER, "colb" VARCHAR) WITH (PARTITIONED_BY=ARRAY['colb'])""",
177+
"""CREATE TABLE IF NOT EXISTS "datalake"."test_schema"."test_table" WITH (PARTITIONED_BY=ARRAY['colb']) AS SELECT 1""",
152178
"""CREATE TABLE IF NOT EXISTS "datalake_iceberg"."test_schema"."test_table" ("cola" INTEGER, "colb" VARCHAR) WITH (PARTITIONING=ARRAY['colb'])""",
179+
"""CREATE TABLE IF NOT EXISTS "datalake_iceberg"."test_schema"."test_table" WITH (PARTITIONING=ARRAY['colb']) AS SELECT 1""",
153180
]

0 commit comments

Comments
 (0)