Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cloudbuild.sfdc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,17 @@ steps:
# Copy generated SFDC DAG sql files to Target GCS bucket
if [[ $(find generated_sql -type f 2> /dev/null | wc -l) -gt 0 ]]
then
echo "Copying SFDC SQL files to gs://${_TGT_BUCKET_}/dags/sfdc."
gsutil -m cp -r './generated_sql/*' gs://${_TGT_BUCKET_}/dags/
echo "Copying SFDC SQL files to gs://${_TGT_BUCKET_}/dags/ (sfdc)."
gcloud storage cp --recursive './generated_sql/*' gs://${_TGT_BUCKET_}/dags/
echo "✅ SFDC SQL files have been copied."
else
echo "🔪No SQL files found under generated_sql/sfdc directory or the directory does not exist. Skipping copy.🔪"
fi
# Copy generated SFDC DAG python and related files to Target GCS bucket
if [[ $(find generated_dag -type f 2> /dev/null | wc -l) -gt 0 ]]
then
echo "Copying SFDC DAG files to gs://${_TGT_BUCKET_}/dags/sfdc."
gsutil -m cp -r './generated_dag/*' gs://${_TGT_BUCKET_}/dags/
echo "Copying SFDC DAG files to gs://${_TGT_BUCKET_}/dags/ (sfdc)."
gcloud storage cp --recursive './generated_dag/*' gs://${_TGT_BUCKET_}/dags/
echo "✅ SFDC DAG files have been copied."
else
echo "🔪No Python files found under generated_dag/sfdc directory or the directory does not exist. Skipping copy.🔪"
Expand Down
55 changes: 33 additions & 22 deletions src/cdc_dag_generator/templates/airflow_dag_raw_to_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@
from pendulum import UTC

from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow import __version__ as airflow_version
try:
from airflow.api.common.trigger_dag import trigger_dag
except ImportError:
from airflow.api.common.experimental.trigger_dag import trigger_dag
from airflow.exceptions import AirflowRescheduleException
from airflow.models.dagrun import DagRun
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.state import State
from airflow.utils.db import provide_session
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils import timezone

try:
from airflow.api.common.trigger_dag import trigger_dag
except ImportError:
from airflow.api.common.experimental.trigger_dag import trigger_dag
from airflow.utils.db import provide_session
from packaging.version import Version


_RAW_WAITING_TIMEOUT_MINUTES = 10
Expand Down Expand Up @@ -69,18 +69,19 @@ def check_raw_if_deployed(session=None, **kwargs):
del kwargs
now = Pendulum.now(UTC)

active_runs = DagRun.find(dag_id=_RAW_DAG_ID, state=State.RUNNING)
active_runs = DagRun.find(dag_id=_RAW_DAG_ID, state="running")
if active_runs and len(active_runs) > 0:
logging.info("Rescheduling to wait for an active run of the Raw DAG.")
raise AirflowRescheduleException(now + timedelta(
minutes=_RAW_WAITING_TIMEOUT_MINUTES))

complete_runs: list[DagRun] = DagRun.find(dag_id=_RAW_DAG_ID,
state=State.SUCCESS)
state="success")
run_raw_now = True
if complete_runs and len(complete_runs) > 0:
if (now - complete_runs[-1].execution_date
).total_hours() < _RAW_AGE_HOURS_MAX:
run_date = getattr(complete_runs[-1], "logical_date",
getattr(complete_runs[-1], "execution_date", None))
if (now - run_date).total_hours() < _RAW_AGE_HOURS_MAX:
run_raw_now = False
logging.info("Found a recent run of the Raw DAG.")

Expand All @@ -91,26 +92,36 @@ def check_raw_if_deployed(session=None, **kwargs):
logging.info("No Raw DAG %s found.", _RAW_DAG_ID)
return
logging.info("Starting a new run of the Raw DAG")
trigger_dag(
dag_id=_RAW_DAG_ID,
run_id=f"forced__{now.isoformat()}",
conf=None,
execution_date=timezone.utcnow(),
replace_microseconds=False,
)
trigger_kwargs = {
"dag_id": _RAW_DAG_ID,
"run_id": f"forced__{now.isoformat()}",
"conf": None,
}
if Version(airflow_version) >= Version("3.0.0"):
logical_date = timezone.utcnow().replace(microsecond=0)
trigger_kwargs["logical_date"] = logical_date
else:
trigger_kwargs["execution_date"] = timezone.utcnow()
trigger_kwargs["replace_microseconds"] = False
trigger_dag(**trigger_kwargs)
logging.info("Rescheduling to wait for a new run of the Raw DAG.")
raise AirflowRescheduleException(now + timedelta(
minutes=_RAW_WAITING_TIMEOUT_MINUTES))

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": "${load_frequency}"}
else:
schedule_kwarg = {"schedule_interval": "${load_frequency}"}

with DAG(dag_id=_IDENTIFIER,
description=(
"Merge from Salesforce RAW BQ dataset to CDC BQ dataset for "
"'${project_id}.${cdc_dataset}.${base_table}' table"),
default_args=default_args,
schedule_interval="${load_frequency}",
catchup=False,
tags=["sfdc","cdc"],
max_active_runs=1) as dag:
max_active_runs=1,
**schedule_kwarg) as dag:
check_raw = PythonOperator(task_id="check_" + _RAW_DAG_ID,
python_callable=check_raw_if_deployed,
dag=dag)
Expand Down
4 changes: 2 additions & 2 deletions src/common/materializer/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ echo "generate_dependent_dags.py completed successfully."
if [[ $(find generated_materializer_dag_files/*/*/task_dep_dags -type f 2> /dev/null | wc -l) -gt 0 ]]
then
echo "Copying DAG files to GCS bucket..."
echo "gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${GCS_TGT_BUCKET}/dags/"
gsutil -m cp -r 'generated_materializer_dag_files/*' "gs://${GCS_TGT_BUCKET}/dags/"
echo "gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${GCS_TGT_BUCKET}/dags/"
gcloud storage cp --recursive 'generated_materializer_dag_files/*' "gs://${GCS_TGT_BUCKET}/dags/"
else
echo "No task dependent DAG files to copy to GCS bucket!"
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from datetime import timedelta

import airflow
from airflow import __version__ as airflow_version
from packaging.version import Version
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.bigquery import \
BigQueryInsertJobOperator
Expand All @@ -34,18 +36,23 @@

default_dag_args = {
"depends_on_past": False,
"start_date": datetime(${year}, ${month}, ${day}),
"start_date": datetime(int("${year}"), int("${month}"), int("${day}")),
"catchup": False,
"retries": 1,
"retry_delay": timedelta(minutes=30),
}

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": "${load_frequency}"}
else:
schedule_kwarg = {"schedule_interval": "${load_frequency}"}

with airflow.DAG("${dag_full_name}",
default_args=default_dag_args,
catchup=False,
max_active_runs=1,
schedule_interval="${load_frequency}",
tags=${tags}) as dag:
tags=ast.literal_eval("${tags}"),
**schedule_kwarg) as dag:
start_task = EmptyOperator(task_id="start")
refresh_table = BigQueryInsertJobOperator(
task_id="refresh_table",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from datetime import timedelta

import airflow
from airflow import __version__ as airflow_version
from packaging.version import Version
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.bigquery import \
BigQueryInsertJobOperator
Expand All @@ -36,18 +38,23 @@

default_dag_args = {
"depends_on_past": False,
"start_date": datetime(${year}, ${month}, ${day}),
"start_date": datetime(int("${year}"), int("${month}"), int("${day}")),
"catchup": False,
"retries": 1,
"retry_delay": timedelta(minutes=30),
}

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": "${load_frequency}"}
else:
schedule_kwarg = {"schedule_interval": "${load_frequency}"}

with airflow.DAG("${dag_full_name}",
default_args=default_dag_args,
catchup=False,
max_active_runs=1,
schedule_interval="${load_frequency}",
tags=${tags}) as dag:
tags=ast.literal_eval("${tags}"),
**schedule_kwarg) as dag:

start_task = EmptyOperator(task_id="start")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ steps:
if [[ $(find generated_materializer_dag_files -type f 2> /dev/null | wc -l) -gt 0 ]]
then
echo "Copying DAG files to GCS bucket..."
echo "gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/"
gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/
echo "gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/"
gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/
else
echo "No files to copy to GCS bucket!"
fi
Expand Down
6 changes: 3 additions & 3 deletions src/common/py_libs/k9_deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ def _simple_process_and_upload(k9_id: str, k9_dir: str, jinja_dict: dict,
if "__init__.py" not in [str(p.relative_to(k9_dir)) for p in k9_files]:
with open(f"{tmp_dir}/__init__.py", "w", encoding="utf-8") as f:
f.writelines([
"import os",
"import sys",
"import os\n",
"import sys\n",
("sys.path.append("
"os.path.dirname(os.path.realpath(__file__)))")
"os.path.dirname(os.path.realpath(__file__)))\n")
])

if data_source == "k9":
Expand Down
2 changes: 1 addition & 1 deletion src/common/py_libs/resource_validation_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def validate_resources(
if isinstance(ex, NotFound):
logging.error("🛑 Storage bucket `%s` doesn't exist. 🛑",
bucket.name)
elif isinstance(ex, Unauthorized, Forbidden):
elif isinstance(ex, (Unauthorized, Forbidden)):
if checking_on_writing:
logging.error("🛑 Storage bucket `%s` "
"is not writable. 🛑", bucket.name)
Expand Down
11 changes: 9 additions & 2 deletions src/raw_dag_generator/templates/airflow_dag_sfdc_to_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import os

from airflow import DAG
from airflow import __version__ as airflow_version
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from packaging.version import Version

# Use dynamic import to account for Airflow directory structure limitations.
_THIS_DIR = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -44,15 +46,20 @@
"retry_delay": timedelta(minutes=10),
}

if Version(airflow_version) >= Version("2.4.0"):
schedule_kwarg = {"schedule": "${load_frequency}"}
else:
schedule_kwarg = {"schedule_interval": "${load_frequency}"}

with DAG(dag_id=_IDENTIFIER,
description=(
"Data extraction from Salesforce system to BQ RAW dataset "
"for '${base_table}' object"),
default_args=default_args,
schedule_interval="${load_frequency}",
tags=["sfdc", "raw"],
catchup = False,
max_active_runs=1) as dag:
max_active_runs=1,
**schedule_kwarg) as dag:
start_task = EmptyOperator(task_id="start")

extract_data = PythonOperator(
Expand Down