Skip to content

Commit 57f3427

Browse files
committed
Feat: Tag BigQuery queries with their correlation_id as label
1 parent 84d5341 commit 57f3427

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ def _job_params(self) -> t.Dict[str, t.Any]:
134134
}
135135
if self._extra_config.get("maximum_bytes_billed"):
136136
params["maximum_bytes_billed"] = self._extra_config.get("maximum_bytes_billed")
137+
if self.correlation_id:
138+
# BigQuery label keys must be lowercase
139+
key = self.correlation_id.job_type.value.lower()
140+
params["labels"] = {key: self.correlation_id.job_id}
137141
return params
138142

139143
@property

tests/core/engine_adapter/integration/test_integration_bigquery.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
from sqlmesh.core.model import SqlModel, load_sql_based_model
1414
from sqlmesh.core.plan import Plan
1515
from sqlmesh.core.table_diff import TableDiff
16-
from tests.core.engine_adapter.integration import TestContext
16+
from sqlmesh.utils import CorrelationId
1717
from pytest import FixtureRequest
18+
from tests.core.engine_adapter.integration import TestContext
1819
from tests.core.engine_adapter.integration import (
1920
TestContext,
2021
generate_pytest_params,
@@ -400,3 +401,24 @@ def test_table_diff_table_name_matches_column_name(ctx: TestContext):
400401

401402
assert row_diff.stats["join_count"] == 1
402403
assert row_diff.full_match_count == 1
404+
405+
406+
def test_correlation_id_in_bigquery_job_labels(ctx: TestContext):
407+
model_name = ctx.table("test")
408+
409+
sqlmesh = ctx.create_context()
410+
411+
sqlmesh.upsert_model(
412+
load_sql_based_model(d.parse(f"MODEL (name {model_name}, kind FULL); SELECT 1 AS col"))
413+
)
414+
415+
plan: Plan = sqlmesh.plan(auto_apply=True, no_prompts=True)
416+
417+
correlation_id = CorrelationId.from_plan_id(plan.plan_id)
418+
snapshot_evaluator = sqlmesh.snapshot_evaluator(correlation_id)
419+
adapter = t.cast(BigQueryEngineAdapter, snapshot_evaluator.adapter)
420+
421+
assert adapter.correlation_id is not None
422+
423+
labels = adapter._job_params.get("labels")
424+
assert labels == {correlation_id.job_type.value.lower(): correlation_id.job_id}

0 commit comments

Comments
 (0)