Skip to content

Commit 07701e6

Browse files
committed
Fix: parse runtime-rendered fields, extract python env deps from merge_filter
1 parent 61455f2 commit 07701e6

File tree

2 files changed

+128
-14
lines changed

2 files changed

+128
-14
lines changed

sqlmesh/core/model/definition.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@
7777
RUNTIME_RENDERED_MODEL_FIELDS = {
7878
"audits",
7979
"signals",
80-
"description",
81-
"cron",
8280
"merge_filter",
8381
} | PROPERTIES
8482

@@ -2469,6 +2467,9 @@ def _create_model(
24692467
if isinstance(property_values, exp.Tuple):
24702468
statements.extend(property_values.expressions)
24712469

2470+
if isinstance(getattr(kwargs.get("kind"), "merge_filter", None), exp.Expression):
2471+
statements.append(kwargs["kind"].merge_filter)
2472+
24722473
jinja_macro_references, used_variables = extract_macro_references_and_variables(
24732474
*(gen(e if isinstance(e, exp.Expression) else e[0]) for e in statements)
24742475
)
@@ -2749,23 +2750,37 @@ def render_field_value(value: t.Any) -> t.Any:
27492750

27502751
return value
27512752

2753+
def parse_strings_with_macro_refs(value: t.Any) -> t.Any:
2754+
if isinstance(value, str) and "@" in value:
2755+
return exp.maybe_parse(value, dialect=dialect)
2756+
2757+
if isinstance(value, dict):
2758+
for k, v in dict(value).items():
2759+
value[k] = parse_strings_with_macro_refs(v)
2760+
elif isinstance(value, list):
2761+
value = [parse_strings_with_macro_refs(v) for v in value]
2762+
2763+
return value
2764+
27522765
for field_name, field_info in ModelMeta.all_field_infos().items():
27532766
field = field_info.alias or field_name
2767+
field_value = fields.get(field)
27542768

2755-
if field in RUNTIME_RENDERED_MODEL_FIELDS:
2769+
if field in ("cron", "description") or field_value is None:
27562770
continue
27572771

2758-
field_value = fields.get(field)
2759-
if field_value is None:
2772+
if field in RUNTIME_RENDERED_MODEL_FIELDS:
2773+
fields[field] = parse_strings_with_macro_refs(field_value)
27602774
continue
27612775

27622776
if isinstance(field_value, dict):
27632777
rendered_dict = {}
27642778
for key, value in field_value.items():
27652779
if key in RUNTIME_RENDERED_MODEL_FIELDS:
2766-
rendered_dict[key] = value
2780+
rendered_dict[key] = parse_strings_with_macro_refs(value)
27672781
elif (rendered := render_field_value(value)) is not None:
27682782
rendered_dict[key] = rendered
2783+
27692784
if rendered_dict:
27702785
fields[field] = rendered_dict
27712786
else:

tests/core/test_model.py

Lines changed: 107 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6480,26 +6480,110 @@ def model_with_macros(evaluator, **kwargs):
64806480
assert query.sql() == """SELECT 'test_value' AS "a" """.strip()
64816481

64826482

6483+
def test_unrendered_macros_sql_model(mocker: MockerFixture) -> None:
6484+
model = load_sql_based_model(
6485+
parse(
6486+
"""
6487+
MODEL (
6488+
name db.employees,
6489+
kind INCREMENTAL_BY_UNIQUE_KEY (
6490+
unique_key @{key},
6491+
merge_filter source.id > 0 and target.updated_at < @end_ds and source.updated_at > @start_ds and @merge_filter_var
6492+
),
6493+
cron '@daily',
6494+
allow_partials @IF(@gateway = 'dev', True, False),
6495+
physical_properties (
6496+
location1 = @'s3://bucket/prefix/@{schema_name}/@{table_name}',
6497+
location2 = @IF(@gateway = 'dev', @'hdfs://@{catalog_name}/@{schema_name}/dev/@{table_name}', @'s3://prod/@{table_name}'),
6498+
foo = @physical_var
6499+
),
6500+
virtual_properties (
6501+
creatable_type = @{create_type},
6502+
bar = @virtual_var,
6503+
),
6504+
session_properties (
6505+
'spark.executor.cores' = @IF(@gateway = 'dev', 1, 2),
6506+
'spark.executor.memory' = '1G',
6507+
baz = @session_var
6508+
),
6509+
);
6510+
6511+
SELECT * FROM src;
6512+
"""
6513+
),
6514+
variables={
6515+
"bar": "suffix",
6516+
"gateway": "dev",
6517+
"key": "a",
6518+
"create_type": "'SECURE'",
6519+
"merge_filter_var": True,
6520+
"physical_var": "bla",
6521+
"virtual_var": "blb",
6522+
"session_var": "blc",
6523+
},
6524+
)
6525+
6526+
assert model.python_env[c.SQLMESH_VARS] == Executable.value(
6527+
{
6528+
"gateway": "dev",
6529+
"create_type": "'SECURE'",
6530+
"merge_filter_var": True,
6531+
"physical_var": "bla",
6532+
"virtual_var": "blb",
6533+
"session_var": "blc",
6534+
}
6535+
)
6536+
6537+
assert "location1" in model.physical_properties
6538+
assert "location2" in model.physical_properties
6539+
6540+
# The properties will stay unrendered at load time
6541+
assert model.session_properties == {
6542+
"spark.executor.cores": exp.maybe_parse("@IF(@gateway = 'dev', 1, 2)"),
6543+
"spark.executor.memory": "1G",
6544+
"baz": exp.maybe_parse("@session_var"),
6545+
}
6546+
assert model.virtual_properties["creatable_type"] == exp.maybe_parse("@{create_type}")
6547+
6548+
assert (
6549+
model.physical_properties["location1"].sql()
6550+
== "@'s3://bucket/prefix/@{schema_name}/@{table_name}'"
6551+
)
6552+
assert (
6553+
model.physical_properties["location2"].sql()
6554+
== "@IF(@gateway = 'dev', @'hdfs://@{catalog_name}/@{schema_name}/dev/@{table_name}', @'s3://prod/@{table_name}')"
6555+
)
6556+
6557+
# merge_filter will stay unrendered as well
6558+
assert model.unique_key[0] == exp.column("a", quoted=True)
6559+
assert (
6560+
t.cast(exp.Expression, model.merge_filter).sql()
6561+
== '"__merge_source__"."id" > 0 AND "__merge_target__"."updated_at" < @end_ds AND "__merge_source__"."updated_at" > @start_ds AND @merge_filter_var'
6562+
)
6563+
6564+
64836565
def test_unrendered_macros_python_model(mocker: MockerFixture) -> None:
64846566
@model(
64856567
"test_unrendered_macros_python_model_@{bar}",
64866568
is_sql=True,
64876569
kind=dict(
64886570
name=ModelKindName.INCREMENTAL_BY_UNIQUE_KEY,
64896571
unique_key="@{key}",
6490-
merge_filter="source.id > 0 and target.updated_at < @end_ds and source.updated_at > @start_ds",
6572+
merge_filter="source.id > 0 and target.updated_at < @end_ds and source.updated_at > @start_ds and @merge_filter_var",
64916573
),
64926574
cron="@daily",
64936575
columns={"a": "string"},
64946576
allow_partials="@IF(@gateway = 'dev', True, False)",
64956577
physical_properties=dict(
64966578
location1="@'s3://bucket/prefix/@{schema_name}/@{table_name}'",
64976579
location2="@IF(@gateway = 'dev', @'hdfs://@{catalog_name}/@{schema_name}/dev/@{table_name}', @'s3://prod/@{table_name}')",
6580+
foo="@physical_var",
64986581
),
6499-
virtual_properties={"creatable_type": "@{create_type}"},
6582+
virtual_properties={"creatable_type": "@{create_type}", "bar": "@virtual_var"},
65006583
session_properties={
65016584
"spark.executor.cores": "@IF(@gateway = 'dev', 1, 2)",
65026585
"spark.executor.memory": "1G",
6586+
"baz": "@session_var",
65036587
},
65046588
)
65056589
def model_with_macros(evaluator, **kwargs):
@@ -6517,12 +6601,24 @@ def model_with_macros(evaluator, **kwargs):
65176601
"gateway": "dev",
65186602
"key": "a",
65196603
"create_type": "'SECURE'",
6604+
"merge_filter_var": True,
6605+
"physical_var": "bla",
6606+
"virtual_var": "blb",
6607+
"session_var": "blc",
65206608
},
65216609
)
65226610

65236611
assert python_sql_model.name == "test_unrendered_macros_python_model_suffix"
65246612
assert python_sql_model.python_env[c.SQLMESH_VARS] == Executable.value(
6525-
{"test_var_a": "test_value"}
6613+
{
6614+
"test_var_a": "test_value",
6615+
"gateway": "dev",
6616+
"create_type": "'SECURE'",
6617+
"merge_filter_var": True,
6618+
"physical_var": "bla",
6619+
"virtual_var": "blb",
6620+
"session_var": "blc",
6621+
}
65266622
)
65276623
assert python_sql_model.enabled
65286624

@@ -6536,25 +6632,28 @@ def model_with_macros(evaluator, **kwargs):
65366632

65376633
# The properties will stay unrendered at load time
65386634
assert python_sql_model.session_properties == {
6539-
"spark.executor.cores": "@IF(@gateway = 'dev', 1, 2)",
6635+
"spark.executor.cores": exp.maybe_parse("@IF(@gateway = 'dev', 1, 2)"),
65406636
"spark.executor.memory": "1G",
6637+
"baz": exp.maybe_parse("@session_var"),
65416638
}
6542-
assert python_sql_model.virtual_properties["creatable_type"] == exp.convert("@{create_type}")
6639+
assert python_sql_model.virtual_properties["creatable_type"] == exp.maybe_parse(
6640+
"@{create_type}"
6641+
)
65436642

65446643
assert (
6545-
python_sql_model.physical_properties["location1"].text("this")
6644+
python_sql_model.physical_properties["location1"].sql()
65466645
== "@'s3://bucket/prefix/@{schema_name}/@{table_name}'"
65476646
)
65486647
assert (
6549-
python_sql_model.physical_properties["location2"].text("this")
6648+
python_sql_model.physical_properties["location2"].sql()
65506649
== "@IF(@gateway = 'dev', @'hdfs://@{catalog_name}/@{schema_name}/dev/@{table_name}', @'s3://prod/@{table_name}')"
65516650
)
65526651

65536652
# merge_filter will stay unrendered as well
65546653
assert python_sql_model.unique_key[0] == exp.column("a", quoted=True)
65556654
assert (
65566655
python_sql_model.merge_filter.sql()
6557-
== '"source"."id" > 0 AND "target"."updated_at" < @end_ds AND "source"."updated_at" > @start_ds'
6656+
== '"__merge_source__"."id" > 0 AND "__merge_target__"."updated_at" < @end_ds AND "__merge_source__"."updated_at" > @start_ds AND @merge_filter_var'
65586657
)
65596658

65606659

0 commit comments

Comments
 (0)