From a80a088c7576b27f6704b28a5c5543fe8120e45b Mon Sep 17 00:00:00 2001 From: Angelica Lastra Date: Mon, 6 Apr 2026 13:23:50 -0700 Subject: [PATCH 1/2] update dbt run dag to include dbt deps before seed or run so packages remain updated without needing to do it manually --- CHANGELOG.md | 8 ++++++++ ea_airflow_util/dags/run_dbt_airflow_dag.py | 16 ++++++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index edad10f..a02643a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# ea_airflow_util v0.3.8 +## New Features +- Add `dbt deps` step to `RunDbtDag` before `dbt seed` to ensure dbt packages are installed before execution. + +## Fixes +- Fix bug in `RunDbtDag` where `test_vars` was incorrectly assigned `run_vars` instead of the user-provided `test_vars`. + + # ea_airflow_util v0.3.7 ## Fixes - Fix change in interface in `SlackWebhookHook` instantiation in Slack callables. diff --git a/ea_airflow_util/dags/run_dbt_airflow_dag.py b/ea_airflow_util/dags/run_dbt_airflow_dag.py index eca5ab6..0cb2e4b 100644 --- a/ea_airflow_util/dags/run_dbt_airflow_dag.py +++ b/ea_airflow_util/dags/run_dbt_airflow_dag.py @@ -11,7 +11,7 @@ from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.utils.task_group import TaskGroup -from airflow_dbt.operators.dbt_operator import DbtRunOperator, DbtSeedOperator, DbtTestOperator +from airflow_dbt.operators.dbt_operator import DbtDepsOperator, DbtRunOperator, DbtSeedOperator, DbtTestOperator from ea_airflow_util.dags.ea_custom_dag import EACustomDAG from ea_airflow_util.callables.variable import check_variable, update_variable @@ -79,7 +79,7 @@ def __init__(self, # run-time vars self.seed_vars = seed_vars self.run_vars = run_vars - self.test_vars = run_vars + self.test_vars = test_vars # bluegreen self.opt_swap = opt_swap @@ -168,6 +168,14 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) as dbt_task_group: + dbt_deps = DbtDepsOperator( + task_id=f'dbt_deps_{self.environment}', + dir=self.dbt_repo_path, + target=self.dbt_target_name, + dbt_bin=self.dbt_bin_path, + dag=self.dag + ) + dbt_seed = DbtSeedOperator( task_id= f'dbt_seed_{self.environment}', dir = self.dbt_repo_path, @@ -198,7 +206,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) - dbt_seed >> dbt_run >> dbt_test + dbt_deps >> dbt_seed >> dbt_run >> dbt_test # bluegreen operator @@ -252,7 +260,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) - dbt_build_artifact_tables >> dbt_seed + dbt_build_artifact_tables >> dbt_deps # Trigger downstream DAG when `dbt run` succeeds if self.external_dags: From 4c86dbd2918dc39be438dca2a616bf4fa0cf825d Mon Sep 17 00:00:00 2001 From: Angelica Lastra Date: Mon, 6 Apr 2026 13:28:50 -0700 Subject: [PATCH 2/2] dbt deps should comme before build artifacts --- ea_airflow_util/dags/run_dbt_airflow_dag.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ea_airflow_util/dags/run_dbt_airflow_dag.py b/ea_airflow_util/dags/run_dbt_airflow_dag.py index 0cb2e4b..a8bb320 100644 --- a/ea_airflow_util/dags/run_dbt_airflow_dag.py +++ b/ea_airflow_util/dags/run_dbt_airflow_dag.py @@ -147,9 +147,10 @@ def __init__(self, # build function for tasks def build_dbt_run(self, on_success_callback=None, **kwargs): """ - four tasks defined here: + tasks defined here: - dbt seed: + dbt deps: + dbt seed: dbt run: dbt test: dbt swap: bluegreen step, not required @@ -260,7 +261,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) - dbt_build_artifact_tables >> dbt_deps + dbt_deps >> dbt_build_artifact_tables >> dbt_seed # Trigger downstream DAG when `dbt run` succeeds if self.external_dags: