diff --git a/cloudbuild.sfdc.yaml b/cloudbuild.sfdc.yaml index fbcc751..9e18b0d 100644 --- a/cloudbuild.sfdc.yaml +++ b/cloudbuild.sfdc.yaml @@ -94,8 +94,8 @@ steps: # Copy generated SFDC DAG sql files to Target GCS bucket if [[ $(find generated_sql -type f 2> /dev/null | wc -l) -gt 0 ]] then - echo "Copying SFDC SQL files to gs://${_TGT_BUCKET_}/dags/sfdc." - gsutil -m cp -r './generated_sql/*' gs://${_TGT_BUCKET_}/dags/ + echo "Copying SFDC SQL files to gs://${_TGT_BUCKET_}/dags/ (sfdc)." + gcloud storage cp --recursive './generated_sql/*' gs://${_TGT_BUCKET_}/dags/ echo "✅ SFDC SQL files have been copied." else echo "🔪No SQL files found under generated_sql/sfdc directory or the directory does not exist. Skipping copy.🔪" @@ -103,8 +103,8 @@ steps: # Copy generated SFDC DAG python and related files to Target GCS bucket if [[ $(find generated_dag -type f 2> /dev/null | wc -l) -gt 0 ]] then - echo "Copying SFDC DAG files to gs://${_TGT_BUCKET_}/dags/sfdc." - gsutil -m cp -r './generated_dag/*' gs://${_TGT_BUCKET_}/dags/ + echo "Copying SFDC DAG files to gs://${_TGT_BUCKET_}/dags/ (sfdc)." + gcloud storage cp --recursive './generated_dag/*' gs://${_TGT_BUCKET_}/dags/ echo "✅ SFDC DAG files have been copied." else echo "🔪No Python files found under generated_dag/sfdc directory or the directory does not exist. Skipping copy.🔪" diff --git a/src/cdc_dag_generator/templates/airflow_dag_raw_to_cdc.py b/src/cdc_dag_generator/templates/airflow_dag_raw_to_cdc.py index f5d6210..036b330 100644 --- a/src/cdc_dag_generator/templates/airflow_dag_raw_to_cdc.py +++ b/src/cdc_dag_generator/templates/airflow_dag_raw_to_cdc.py @@ -27,20 +27,20 @@ from pendulum import UTC from airflow import DAG -from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from airflow import __version__ as airflow_version +try: + from airflow.api.common.trigger_dag import trigger_dag +except ImportError: + from airflow.api.common.experimental.trigger_dag import trigger_dag from airflow.exceptions import AirflowRescheduleException -from airflow.models.dagrun import DagRun from airflow.models.dagbag import DagBag +from airflow.models.dagrun import DagRun from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator -from airflow.utils.state import State -from airflow.utils.db import provide_session +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator from airflow.utils import timezone - -try: - from airflow.api.common.trigger_dag import trigger_dag -except ImportError: - from airflow.api.common.experimental.trigger_dag import trigger_dag +from airflow.utils.db import provide_session +from packaging.version import Version _RAW_WAITING_TIMEOUT_MINUTES = 10 @@ -69,18 +69,19 @@ def check_raw_if_deployed(session=None, **kwargs): del kwargs now = Pendulum.now(UTC) - active_runs = DagRun.find(dag_id=_RAW_DAG_ID, state=State.RUNNING) + active_runs = DagRun.find(dag_id=_RAW_DAG_ID, state="running") if active_runs and len(active_runs) > 0: logging.info("Rescheduling to wait for an active run of the Raw DAG.") raise AirflowRescheduleException(now + timedelta( minutes=_RAW_WAITING_TIMEOUT_MINUTES)) complete_runs: list[DagRun] = DagRun.find(dag_id=_RAW_DAG_ID, - state=State.SUCCESS) + state="success") run_raw_now = True if complete_runs and len(complete_runs) > 0: - if (now - complete_runs[-1].execution_date - ).total_hours() < _RAW_AGE_HOURS_MAX: + run_date = getattr(complete_runs[-1], "logical_date", + getattr(complete_runs[-1], "execution_date", None)) + if (now - run_date).total_hours() < _RAW_AGE_HOURS_MAX: run_raw_now = False logging.info("Found a recent run of the Raw DAG.") @@ -91,26 +92,36 @@ def check_raw_if_deployed(session=None, **kwargs): logging.info("No Raw DAG %s found.", _RAW_DAG_ID) return logging.info("Starting a new run of the Raw DAG") - trigger_dag( - dag_id=_RAW_DAG_ID, - run_id=f"forced__{now.isoformat()}", - conf=None, - execution_date=timezone.utcnow(), - replace_microseconds=False, - ) + trigger_kwargs = { + "dag_id": _RAW_DAG_ID, + "run_id": f"forced__{now.isoformat()}", + "conf": None, + } + if Version(airflow_version) >= Version("3.0.0"): + logical_date = timezone.utcnow().replace(microsecond=0) + trigger_kwargs["logical_date"] = logical_date + else: + trigger_kwargs["execution_date"] = timezone.utcnow() + trigger_kwargs["replace_microseconds"] = False + trigger_dag(**trigger_kwargs) logging.info("Rescheduling to wait for a new run of the Raw DAG.") raise AirflowRescheduleException(now + timedelta( minutes=_RAW_WAITING_TIMEOUT_MINUTES)) +if Version(airflow_version) >= Version("2.4.0"): + schedule_kwarg = {"schedule": "${load_frequency}"} +else: + schedule_kwarg = {"schedule_interval": "${load_frequency}"} + with DAG(dag_id=_IDENTIFIER, description=( "Merge from Salesforce RAW BQ dataset to CDC BQ dataset for " "'${project_id}.${cdc_dataset}.${base_table}' table"), default_args=default_args, - schedule_interval="${load_frequency}", catchup=False, tags=["sfdc","cdc"], - max_active_runs=1) as dag: + max_active_runs=1, + **schedule_kwarg) as dag: check_raw = PythonOperator(task_id="check_" + _RAW_DAG_ID, python_callable=check_raw_if_deployed, dag=dag) diff --git a/src/common/materializer/deploy.sh b/src/common/materializer/deploy.sh index 54fab27..bc9ad53 100755 --- a/src/common/materializer/deploy.sh +++ b/src/common/materializer/deploy.sh @@ -220,8 +220,8 @@ echo "generate_dependent_dags.py completed successfully." if [[ $(find generated_materializer_dag_files/*/*/task_dep_dags -type f 2> /dev/null | wc -l) -gt 0 ]] then echo "Copying DAG files to GCS bucket..." - echo "gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${GCS_TGT_BUCKET}/dags/" - gsutil -m cp -r 'generated_materializer_dag_files/*' "gs://${GCS_TGT_BUCKET}/dags/" + echo "gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${GCS_TGT_BUCKET}/dags/" + gcloud storage cp --recursive 'generated_materializer_dag_files/*' "gs://${GCS_TGT_BUCKET}/dags/" else echo "No task dependent DAG files to copy to GCS bucket!" fi diff --git a/src/common/materializer/templates/airflow_dag_template_reporting.py b/src/common/materializer/templates/airflow_dag_template_reporting.py index ddadaf1..7704ada 100644 --- a/src/common/materializer/templates/airflow_dag_template_reporting.py +++ b/src/common/materializer/templates/airflow_dag_template_reporting.py @@ -24,6 +24,8 @@ from datetime import timedelta import airflow +from airflow import __version__ as airflow_version +from packaging.version import Version from airflow.operators.empty import EmptyOperator from airflow.providers.google.cloud.operators.bigquery import \ BigQueryInsertJobOperator @@ -34,18 +36,23 @@ default_dag_args = { "depends_on_past": False, - "start_date": datetime(${year}, ${month}, ${day}), + "start_date": datetime(int("${year}"), int("${month}"), int("${day}")), "catchup": False, "retries": 1, "retry_delay": timedelta(minutes=30), } +if Version(airflow_version) >= Version("2.4.0"): + schedule_kwarg = {"schedule": "${load_frequency}"} +else: + schedule_kwarg = {"schedule_interval": "${load_frequency}"} + with airflow.DAG("${dag_full_name}", default_args=default_dag_args, catchup=False, max_active_runs=1, - schedule_interval="${load_frequency}", - tags=${tags}) as dag: + tags=ast.literal_eval("${tags}"), + **schedule_kwarg) as dag: start_task = EmptyOperator(task_id="start") refresh_table = BigQueryInsertJobOperator( task_id="refresh_table", diff --git a/src/common/materializer/templates/airflow_task_dep_dag_template_reporting.py b/src/common/materializer/templates/airflow_task_dep_dag_template_reporting.py index bb01d39..21c5637 100644 --- a/src/common/materializer/templates/airflow_task_dep_dag_template_reporting.py +++ b/src/common/materializer/templates/airflow_task_dep_dag_template_reporting.py @@ -24,6 +24,8 @@ from datetime import timedelta import airflow +from airflow import __version__ as airflow_version +from packaging.version import Version from airflow.operators.empty import EmptyOperator from airflow.providers.google.cloud.operators.bigquery import \ BigQueryInsertJobOperator @@ -36,18 +38,23 @@ default_dag_args = { "depends_on_past": False, - "start_date": datetime(${year}, ${month}, ${day}), + "start_date": datetime(int("${year}"), int("${month}"), int("${day}")), "catchup": False, "retries": 1, "retry_delay": timedelta(minutes=30), } +if Version(airflow_version) >= Version("2.4.0"): + schedule_kwarg = {"schedule": "${load_frequency}"} +else: + schedule_kwarg = {"schedule_interval": "${load_frequency}"} + with airflow.DAG("${dag_full_name}", default_args=default_dag_args, catchup=False, max_active_runs=1, - schedule_interval="${load_frequency}", - tags=${tags}) as dag: + tags=ast.literal_eval("${tags}"), + **schedule_kwarg) as dag: start_task = EmptyOperator(task_id="start") diff --git a/src/common/materializer/templates/cloudbuild_materializer.yaml.jinja b/src/common/materializer/templates/cloudbuild_materializer.yaml.jinja index d7cf88c..6459ab3 100644 --- a/src/common/materializer/templates/cloudbuild_materializer.yaml.jinja +++ b/src/common/materializer/templates/cloudbuild_materializer.yaml.jinja @@ -88,8 +88,8 @@ steps: if [[ $(find generated_materializer_dag_files -type f 2> /dev/null | wc -l) -gt 0 ]] then echo "Copying DAG files to GCS bucket..." - echo "gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/" - gsutil -m cp -r 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/ + echo "gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/" + gcloud storage cp --recursive 'generated_materializer_dag_files/*' gs://${_GCS_TGT_BUCKET}/dags/ else echo "No files to copy to GCS bucket!" fi diff --git a/src/common/py_libs/k9_deployer.py b/src/common/py_libs/k9_deployer.py index 719c76c..165e1ac 100644 --- a/src/common/py_libs/k9_deployer.py +++ b/src/common/py_libs/k9_deployer.py @@ -82,10 +82,10 @@ def _simple_process_and_upload(k9_id: str, k9_dir: str, jinja_dict: dict, if "__init__.py" not in [str(p.relative_to(k9_dir)) for p in k9_files]: with open(f"{tmp_dir}/__init__.py", "w", encoding="utf-8") as f: f.writelines([ - "import os", - "import sys", + "import os\n", + "import sys\n", ("sys.path.append(" - "os.path.dirname(os.path.realpath(__file__)))") + "os.path.dirname(os.path.realpath(__file__)))\n") ]) if data_source == "k9": diff --git a/src/common/py_libs/resource_validation_helper.py b/src/common/py_libs/resource_validation_helper.py index 0072e3d..adbfd6a 100644 --- a/src/common/py_libs/resource_validation_helper.py +++ b/src/common/py_libs/resource_validation_helper.py @@ -128,7 +128,7 @@ def validate_resources( if isinstance(ex, NotFound): logging.error("🛑 Storage bucket `%s` doesn't exist. 🛑", bucket.name) - elif isinstance(ex, Unauthorized, Forbidden): + elif isinstance(ex, (Unauthorized, Forbidden)): if checking_on_writing: logging.error("🛑 Storage bucket `%s` " "is not writable. 🛑", bucket.name) diff --git a/src/raw_dag_generator/templates/airflow_dag_sfdc_to_raw.py b/src/raw_dag_generator/templates/airflow_dag_sfdc_to_raw.py index 0ba0a47..e7bbc4f 100644 --- a/src/raw_dag_generator/templates/airflow_dag_sfdc_to_raw.py +++ b/src/raw_dag_generator/templates/airflow_dag_sfdc_to_raw.py @@ -21,8 +21,10 @@ import os from airflow import DAG +from airflow import __version__ as airflow_version from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator +from packaging.version import Version # Use dynamic import to account for Airflow directory structure limitations. _THIS_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -44,15 +46,20 @@ "retry_delay": timedelta(minutes=10), } +if Version(airflow_version) >= Version("2.4.0"): + schedule_kwarg = {"schedule": "${load_frequency}"} +else: + schedule_kwarg = {"schedule_interval": "${load_frequency}"} + with DAG(dag_id=_IDENTIFIER, description=( "Data extraction from Salesforce system to BQ RAW dataset " "for '${base_table}' object"), default_args=default_args, - schedule_interval="${load_frequency}", tags=["sfdc", "raw"], catchup = False, - max_active_runs=1) as dag: + max_active_runs=1, + **schedule_kwarg) as dag: start_task = EmptyOperator(task_id="start") extract_data = PythonOperator(