Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# ea_airflow_util v0.3.8
## New Features
- Add `dbt deps` step to `RunDbtDag` before `dbt seed` to ensure dbt packages are installed before execution.

## Fixes
- Fix bug in `RunDbtDag` where `test_vars` was incorrectly assigned `run_vars` instead of the user-provided `test_vars`.


# ea_airflow_util v0.3.7
## Fixes
- Fix change in interface in `SlackWebhookHook` instantiation in Slack callables.
Expand Down
21 changes: 15 additions & 6 deletions ea_airflow_util/dags/run_dbt_airflow_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.task_group import TaskGroup

from airflow_dbt.operators.dbt_operator import DbtRunOperator, DbtSeedOperator, DbtTestOperator
from airflow_dbt.operators.dbt_operator import DbtDepsOperator, DbtRunOperator, DbtSeedOperator, DbtTestOperator

from ea_airflow_util.dags.ea_custom_dag import EACustomDAG
from ea_airflow_util.callables.variable import check_variable, update_variable
Expand Down Expand Up @@ -79,7 +79,7 @@ def __init__(self,
# run-time vars
self.seed_vars = seed_vars
self.run_vars = run_vars
self.test_vars = run_vars
self.test_vars = test_vars

# bluegreen
self.opt_swap = opt_swap
Expand Down Expand Up @@ -147,9 +147,10 @@ def __init__(self,
# build function for tasks
def build_dbt_run(self, on_success_callback=None, **kwargs):
"""
four tasks defined here:
tasks defined here:

dbt seed:
dbt deps:
dbt seed:
dbt run:
dbt test:
dbt swap: bluegreen step, not required
Expand All @@ -168,6 +169,14 @@ def build_dbt_run(self, on_success_callback=None, **kwargs):
dag=self.dag
) as dbt_task_group:

dbt_deps = DbtDepsOperator(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use the --upgrade flag here to actually update the package versions during the run? Otherwise, it will use the package versions already defined in package-lock.yml.

task_id=f'dbt_deps_{self.environment}',
dir=self.dbt_repo_path,
target=self.dbt_target_name,
dbt_bin=self.dbt_bin_path,
dag=self.dag
)

dbt_seed = DbtSeedOperator(
task_id= f'dbt_seed_{self.environment}',
dir = self.dbt_repo_path,
Expand Down Expand Up @@ -198,7 +207,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs):
dag=self.dag
)

dbt_seed >> dbt_run >> dbt_test
dbt_deps >> dbt_seed >> dbt_run >> dbt_test


# bluegreen operator
Expand Down Expand Up @@ -252,7 +261,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs):
dag=self.dag
)

dbt_build_artifact_tables >> dbt_seed
dbt_deps >> dbt_build_artifact_tables >> dbt_seed

# Trigger downstream DAG when `dbt run` succeeds
if self.external_dags:
Expand Down