Skip to content

Commit 0e70f7e

Browse files
committed
Fix: An individual plan DAG failure shouldn't prevent the remaining DAGs from loading in Airflow
1 parent 8d8b7e4 commit 0e70f7e

File tree

2 files changed

+7
-3
lines changed

2 files changed

+7
-3
lines changed

sqlmesh/schedulers/airflow/dag_generator.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,12 @@ def generate_cadence_dags(self, snapshots: t.Iterable[SnapshotIdLike]) -> t.List
9090
dags.append(self._create_cadence_dag_for_snapshot(snapshot, snapshots))
9191
return dags
9292

93-
def generate_plan_application_dag(self, spec: common.PlanDagSpec) -> DAG:
94-
return self._create_plan_application_dag(spec)
93+
def generate_plan_application_dag(self, spec: common.PlanDagSpec) -> t.Optional[DAG]:
94+
try:
95+
return self._create_plan_application_dag(spec)
96+
except Exception:
97+
logger.exception("Failed to generate the plan application DAG '%s'", spec.request_id)
98+
return None
9599

96100
def _create_cadence_dag_for_snapshot(
97101
self, snapshot: Snapshot, snapshots: t.Dict[SnapshotId, Snapshot]

sqlmesh/schedulers/airflow/integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def dags(self) -> t.List[DAG]:
144144
self._create_janitor_dag(),
145145
]
146146

147-
return system_dags + cadence_dags + plan_application_dags
147+
return system_dags + cadence_dags + [d for d in plan_application_dags if d]
148148

149149
def _create_janitor_dag(self) -> DAG:
150150
dag = self._create_system_dag(common.JANITOR_DAG_ID, self._janitor_interval)

0 commit comments

Comments
 (0)