Skip to content

Commit b0691e5

Browse files
committed
PR Feedback 1
1 parent 7144853 commit b0691e5

File tree

5 files changed

+29
-25
lines changed

5 files changed

+29
-25
lines changed

sqlmesh/core/context.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,14 @@ def engine_adapter(self) -> EngineAdapter:
451451
@property
452452
def snapshot_evaluator(self) -> SnapshotEvaluator:
453453
if not self._snapshot_evaluator:
454-
self._snapshot_evaluator = self._create_snapshot_evaluator(log_level=logging.INFO)
454+
self._snapshot_evaluator = SnapshotEvaluator(
455+
{
456+
gateway: adapter.with_settings(log_level=logging.INFO)
457+
for gateway, adapter in self.engine_adapters.items()
458+
},
459+
ddl_concurrent_tasks=self.concurrent_tasks,
460+
selected_gateway=self.selected_gateway,
461+
)
455462

456463
return self._snapshot_evaluator
457464

@@ -1931,7 +1938,7 @@ def _table_diff(
19311938
)
19321939

19331940
return TableDiff(
1934-
adapter=adapter.with_settings(logger.getEffectiveLevel()),
1941+
adapter=adapter.with_settings(log_level=logger.getEffectiveLevel()),
19351942
source=source,
19361943
target=target,
19371944
on=on,
@@ -3064,16 +3071,6 @@ def load_model_tests(
30643071

30653072
return model_tests
30663073

3067-
def _create_snapshot_evaluator(self, **kwargs: t.Any) -> SnapshotEvaluator:
3068-
return SnapshotEvaluator(
3069-
{
3070-
gateway: adapter.with_settings(**kwargs)
3071-
for gateway, adapter in self.engine_adapters.items()
3072-
},
3073-
ddl_concurrent_tasks=self.concurrent_tasks,
3074-
selected_gateway=self.selected_gateway,
3075-
)
3076-
30773074

30783075
class Context(GenericContext[Config]):
30793076
CONFIG_TYPE = Config

sqlmesh/core/engine_adapter/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,13 @@ def __init__(
147147
self._multithreaded = multithreaded
148148
self.correlation_id = correlation_id
149149

150-
def with_settings(self, log_level: int = logging.DEBUG, **kwargs: t.Any) -> EngineAdapter:
150+
def with_settings(self, **kwargs: t.Any) -> EngineAdapter:
151151
adapter = self.__class__(
152152
self._connection_pool,
153153
dialect=self.dialect,
154154
sql_gen_kwargs=self._sql_gen_kwargs,
155155
default_catalog=self._default_catalog,
156-
execute_log_level=log_level,
156+
execute_log_level=kwargs.pop("log_level", self._execute_log_level),
157157
register_comments=self._register_comments,
158158
null_connection=self._extra_config.pop("null_connection", True),
159159
multithreaded=self._multithreaded,

sqlmesh/core/plan/evaluator.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,10 @@ def evaluate(
8989
circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
9090
) -> None:
9191
self._circuit_breaker = circuit_breaker
92+
self.snapshot_evaluator = self.snapshot_evaluator.set_correlation_id(
93+
CorrelationId.from_plan_id(plan.plan_id)
94+
)
9295

93-
self.set_correlation_id(CorrelationId.from_plan_id(plan.plan_id))
9496
self.console.start_plan_evaluation(plan)
9597
analytics.collector.on_plan_apply_start(
9698
plan=plan,
@@ -351,13 +353,6 @@ def visit_finalize_environment_stage(
351353
) -> None:
352354
self.state_sync.finalize(plan.environment)
353355

354-
def set_correlation_id(self, correlation_id: CorrelationId) -> None:
355-
for key, adapter in self.snapshot_evaluator.adapters.items():
356-
if correlation_id != adapter.correlation_id:
357-
self.snapshot_evaluator.adapters[key] = adapter.with_settings(
358-
correlation_id=correlation_id
359-
)
360-
361356
def _promote_snapshots(
362357
self,
363358
plan: EvaluatablePlan,

sqlmesh/core/snapshot/evaluator.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
SnapshotTableCleanupTask,
6262
)
6363
from sqlmesh.core.snapshot.definition import parent_snapshots_by_name
64-
from sqlmesh.utils import random_id
64+
from sqlmesh.utils import random_id, CorrelationId
6565
from sqlmesh.utils.concurrency import (
6666
concurrent_apply_to_snapshots,
6767
concurrent_apply_to_values,
@@ -1190,6 +1190,18 @@ def _execute_create(
11901190
)
11911191
adapter.execute(snapshot.model.render_post_statements(**create_render_kwargs))
11921192

1193+
def set_correlation_id(self, correlation_id: CorrelationId) -> SnapshotEvaluator:
1194+
return SnapshotEvaluator(
1195+
{
1196+
gateway: adapter.with_settings(
1197+
log_level=adapter._execute_log_level, correlation_id=correlation_id
1198+
)
1199+
for gateway, adapter in self.adapters.items()
1200+
},
1201+
self.ddl_concurrent_tasks,
1202+
self.selected_gateway,
1203+
)
1204+
11931205

11941206
def _evaluation_strategy(snapshot: SnapshotInfoLike, adapter: EngineAdapter) -> EvaluationStrategy:
11951207
klass: t.Type

tests/core/test_table_diff.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,9 @@ def test_generated_sql(sushi_context_fixed_date: Context, mocker: MockerFixture)
337337

338338
# make with_settings() return the current instance of engine_adapter so we can still spy on _execute
339339
mocker.patch.object(
340-
engine_adapter, "with_settings", new_callable=lambda: lambda _: engine_adapter
340+
engine_adapter, "with_settings", new_callable=lambda: lambda **kwargs: engine_adapter
341341
)
342-
assert engine_adapter.with_settings(1) == engine_adapter
342+
assert engine_adapter.with_settings() == engine_adapter
343343

344344
spy_execute = mocker.spy(engine_adapter, "_execute")
345345
mocker.patch("sqlmesh.core.engine_adapter.base.random_id", return_value="abcdefgh")

0 commit comments

Comments
 (0)