diff --git a/CHANGELOG.md b/CHANGELOG.md index edad10f..a02643a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# ea_airflow_util v0.3.8 +## New Features +- Add `dbt deps` step to `RunDbtDag` before `dbt seed` to ensure dbt packages are installed before execution. + +## Fixes +- Fix bug in `RunDbtDag` where `test_vars` was incorrectly assigned `run_vars` instead of the user-provided `test_vars`. + + # ea_airflow_util v0.3.7 ## Fixes - Fix change in interface in `SlackWebhookHook` instantiation in Slack callables. diff --git a/ea_airflow_util/dags/run_dbt_airflow_dag.py b/ea_airflow_util/dags/run_dbt_airflow_dag.py index eca5ab6..a8bb320 100644 --- a/ea_airflow_util/dags/run_dbt_airflow_dag.py +++ b/ea_airflow_util/dags/run_dbt_airflow_dag.py @@ -11,7 +11,7 @@ from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.utils.task_group import TaskGroup -from airflow_dbt.operators.dbt_operator import DbtRunOperator, DbtSeedOperator, DbtTestOperator +from airflow_dbt.operators.dbt_operator import DbtDepsOperator, DbtRunOperator, DbtSeedOperator, DbtTestOperator from ea_airflow_util.dags.ea_custom_dag import EACustomDAG from ea_airflow_util.callables.variable import check_variable, update_variable @@ -79,7 +79,7 @@ def __init__(self, # run-time vars self.seed_vars = seed_vars self.run_vars = run_vars - self.test_vars = run_vars + self.test_vars = test_vars # bluegreen self.opt_swap = opt_swap @@ -147,9 +147,10 @@ def __init__(self, # build function for tasks def build_dbt_run(self, on_success_callback=None, **kwargs): """ - four tasks defined here: + tasks defined here: - dbt seed: + dbt deps: + dbt seed: dbt run: dbt test: dbt swap: bluegreen step, not required @@ -168,6 +169,14 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) as dbt_task_group: + dbt_deps = DbtDepsOperator( + task_id=f'dbt_deps_{self.environment}', + dir=self.dbt_repo_path, + target=self.dbt_target_name, + dbt_bin=self.dbt_bin_path, + dag=self.dag + ) + dbt_seed = DbtSeedOperator( task_id= f'dbt_seed_{self.environment}', dir = self.dbt_repo_path, @@ -198,7 +207,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) - dbt_seed >> dbt_run >> dbt_test + dbt_deps >> dbt_seed >> dbt_run >> dbt_test # bluegreen operator @@ -252,7 +261,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) - dbt_build_artifact_tables >> dbt_seed + dbt_deps >> dbt_build_artifact_tables >> dbt_seed # Trigger downstream DAG when `dbt run` succeeds if self.external_dags: