Skip to content

Commit 0e6ef73

Browse files
authored
Fix: Pass table properties when replacing a query in the Spark engine adapter (#1736)
1 parent 10e9d66 commit 0e6ef73

File tree

3 files changed

+33
-2
lines changed

3 files changed

+33
-2
lines changed

sqlmesh/core/engine_adapter/mixins.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def overwrite_target_from_temp(
7676
query: Query,
7777
columns_to_types: t.Dict[str, exp.DataType],
7878
target_table: TableName,
79+
**kwargs: t.Any,
7980
) -> None:
8081
"""
8182
Overwrites the target table from the temp table. This is used when the target table is self-referencing.
@@ -84,6 +85,7 @@ def overwrite_target_from_temp(
8485
exp.select(*columns_to_types).from_(target_table),
8586
target_table,
8687
columns_to_types,
88+
**kwargs,
8789
) as temp_table:
8890

8991
def replace_table(

sqlmesh/core/engine_adapter/spark.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,10 +372,10 @@ def replace_query(
372372

373373
if self_referencing:
374374
return LogicalReplaceQueryMixin.overwrite_target_from_temp(
375-
self, query, columns_to_types, target_table
375+
self, query, columns_to_types, target_table, **kwargs
376376
)
377377

378-
self.create_table(table_name, columns_to_types)
378+
self.create_table(table_name, columns_to_types, **kwargs)
379379
return self._insert_overwrite_by_condition(
380380
table_name, source_queries, columns_to_types, where=exp.true()
381381
)

tests/core/engine_adapter/test_spark.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,35 @@ def test_create_table_properties(make_mocked_engine_adapter: t.Callable):
5656
)
5757

5858

59+
def test_replace_query_table_properties(make_mocked_engine_adapter: t.Callable):
60+
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
61+
62+
columns_to_types = {
63+
"cola": exp.DataType.build("INT"),
64+
"colb": exp.DataType.build("TEXT"),
65+
"colc": exp.DataType.build("TEXT"),
66+
}
67+
adapter.replace_query(
68+
"test_table",
69+
parse_one("SELECT 1 AS cola, '2' AS colb, '3' AS colc"),
70+
columns_to_types=columns_to_types,
71+
partitioned_by=[exp.to_column("colb")],
72+
storage_format="ICEBERG",
73+
table_properties={"a": exp.convert(1)},
74+
)
75+
76+
adapter.cursor.execute.assert_has_calls(
77+
[
78+
call(
79+
"CREATE TABLE IF NOT EXISTS `test_table` (`cola` INT, `colb` STRING, `colc` STRING) USING ICEBERG PARTITIONED BY (`colb`) TBLPROPERTIES ('a'=1)"
80+
),
81+
call(
82+
"INSERT OVERWRITE TABLE `test_table` (`cola`, `colb`, `colc`) SELECT `cola`, `colb`, `colc` FROM (SELECT 1 AS `cola`, '2' AS `colb`, '3' AS `colc`) AS `_subquery` WHERE TRUE"
83+
),
84+
]
85+
)
86+
87+
5988
def test_create_view_properties(make_mocked_engine_adapter: t.Callable):
6089
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
6190

0 commit comments

Comments
 (0)