diff --git a/README.md b/README.md index f9809a4..6779585 100644 --- a/README.md +++ b/README.md @@ -623,7 +623,95 @@ For example, `/ed-fi/apiClients/districts-2425-ds5/{tenant_code}/prod/Stadium` w +## SharefileTransferToSnowflakeDagBuilder +`SharefileTransferToSnowflakeDag` is an Airflow DAG that automates the process of transferring files from ShareFile to Snowflake. The DAG retrieves files from a specified ShareFile location, optionally transforms CSV files into JSONL format, uploads the files to an S3 bucket, and finally loads the data into a Snowflake database. +
+Arguments: + +----- + +| Argument | Description | +|-------------------------|--------------------------------------------------------------------------| +| dag_id | Unique identifier for the DAG | +| airflow_default_args | Default Airflow arguments for the DAG | +| file_sources | Dictionary of file sources and their ShareFile paths (more info in Example Yaml File section) | +| local_base_path | Base path for storing downloaded files locally | +| sharefile_conn_id | Airflow connection ID for ShareFile | +| base_s3_destination_key | S3 key prefix for storing uploaded files | +| s3_conn_id | Airflow connection ID for AWS S3 | +| snowflake_conn_id | Airflow connection ID for Snowflake | +| database | Snowflake database where data will be loaded | +| schema | Snowflake schema where data will be loaded | +| full_refresh | Boolean flag indicating whether to perform a full table refresh | +| schedule_interval | DAG execution schedule (default is None) | +| transform_csv_to_jsonl | Boolean flag to convert CSV files to JSONL before uploading to S3 | +| delete_remote | Boolean flag to delete the original file from ShareFile after transfer | +| delete_local_csv | Boolean flag to delete the local CSV file after transformation | + +Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwargs. + +----- + +
+ +
+Example Yaml File: + +```yaml +default_args: + owner: + run_as_user: + depends_on_past: + start_date: + email: + email_on_failure: False + retries: 0 + trigger_rule: + retry_delay: + execution_timeout: + sla: + +connections: + s3: + s3_conn_id: + s3_bucket : + s3_region : + base_s3_destination_key: + snowflake: + snowflake_conn_id: + snowflake_stage : + snowflake_db : + snowflake_schema : + sharefile: + sharefile_conn_id: + sharefile_base_path: + +variables: + tmp_dir: + local_path: + schedule_interval: + delete_remote: + +file_sources: + filename1: + sharefile_path: /Path/to/document/folder/ + dest_table: schema.table + truncate: True + colnames: + - col1 + - col2 + - col3 + filename2: + sharefile_path: /Path/to/document/folder/ + dest_table: schema.table + truncate: True + colnames: + - col1 + - col2 + - col3 +``` +
# Providers Finally, this package contains a handful of custom DBT operators to be used as an alternative to PythonOperators. diff --git a/ea_airflow_util/dags/s3_to_snowflake_dag.py b/ea_airflow_util/dags/s3_to_snowflake_dag.py index a9e8c71..9513c58 100644 --- a/ea_airflow_util/dags/s3_to_snowflake_dag.py +++ b/ea_airflow_util/dags/s3_to_snowflake_dag.py @@ -36,7 +36,7 @@ def __init__(self, is_manual_upload: bool = False, pool: str, - full_replace: bool = False, #TODO once on latest version of airflow, use dagrun parameter to allow full_replace runs even if not set here at dag level + full_replace: bool = False, do_delete_from_source: bool = True, **kwargs diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py new file mode 100644 index 0000000..1c11b37 --- /dev/null +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -0,0 +1,213 @@ +import logging +import os +from datetime import datetime +from typing import Callable, List, Optional, Union + +from airflow import DAG +from airflow.models import Param +from airflow.operators.python import PythonOperator + +from edu_edfi_airflow.callables import s3 + +from ea_airflow_util.callables.airflow import skip_if_not_in_params_list +from ea_airflow_util.callables import jsonl, snowflake +from ea_airflow_util.providers.sharefile.transfers.sharefile_to_disk import SharefileToDiskOperator +from ea_airflow_util.providers.aws.operators.s3 import S3ToSnowflakeOperator +from ea_airflow_util.callables.airflow import xcom_pull_template +from ea_airflow_util.providers.sharefile.hooks.sharefile import SharefileHook + +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from ea_airflow_util import EACustomDAG + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) + +class SharefileTransferToSnowflakeDagBuilder: + """ + This class is responsible for creating an Apache Airflow DAG that automates the process of transferring + files from ShareFile to Snowflake. + + Parameters: + - dag_id (str): ID of the DAG to be created. + - airflow_default_args (dict): Default arguments to pass to the DAG. + - file_sources (dict): A mapping of file source names to their ShareFile paths. + - local_base_path (str): Base local path for downloading files. + - sharefile_conn_id (str): Airflow connection ID for ShareFile. + - base_s3_destination_key (str): Base S3 key (path prefix) for uploads. + - s3_conn_id (str): Airflow connection ID for AWS S3. + - snowflake_conn_id (str): Airflow connection ID for Snowflake. + - database (str): Snowflake database name. + - schema (str): Snowflake schema name. + - full_refresh (bool): If True, performs a full refresh load in Snowflake. + - schedule_interval (str or timedelta, optional): Airflow schedule interval for the DAG. + - transform_csv_to_jsonl (bool, optional): If True, converts downloaded CSV files to JSONL format. + - delete_remote (bool, optional): If True, deletes the original file from ShareFile after download. + - delete_local_csv (bool, optional): If True, deletes the CSV after transforming to JSONL. + - transfer_s3_to_snowflake (BaseOperator, optional): Custom operator for the S3 to Snowflake transfer step. + - **kwargs: Additional arguments passed to Airflow operators. + """ + def __init__(self, + dag_id: str, + airflow_default_args: dict, + file_sources: dict, + local_base_path: str, + sharefile_conn_id: str, + base_s3_destination_key: str, + s3_conn_id: str, + snowflake_conn_id: str, + database: str, + schema: str, + full_refresh: bool, + schedule_interval = None, + transform_csv_to_jsonl: bool = False, + delete_remote: bool = False, + delete_local_csv: bool = False, + transfer_s3_to_snowflake = None, + **kwargs + ): + self.dag_id = dag_id + self.airflow_default_args = airflow_default_args + self.file_sources = file_sources + self.schedule_interval = schedule_interval + + self.local_base_path = local_base_path + self.transform_csv_to_jsonl = transform_csv_to_jsonl + + self.sharefile_conn_id = sharefile_conn_id + self.delete_remote = delete_remote + + self.delete_local_csv = delete_local_csv + + self.base_s3_destination_key = base_s3_destination_key + self.s3_conn_id = s3_conn_id + + self.snowflake_conn_id = snowflake_conn_id + self.database = database + self.schema = schema + self.full_refresh = full_refresh + self.transfer_s3_to_snowflake = transfer_s3_to_snowflake + + self.logger = logging.getLogger(self.__class__.__name__) + + self.params_dict = { + "file_sources": Param( + default=list(self.file_sources.keys()) if self.file_sources else [], + examples=list(self.file_sources.keys()), + type="array", + description="Newline-separated list of file sources to pull from ShareFile", + ), + } + + self.dag = EACustomDAG(dag_id=self.dag_id, + default_args=self.airflow_default_args, + schedule_interval=self.schedule_interval, + params=self.params_dict, + catchup=False + ) + + def build_python_operator(self, + python_callable: Callable, + **kwargs + ) -> PythonOperator: + """ + Optional Python preprocessing operator to run before Earthmover and Lightbeam. + + :param python_callable: + :param kwargs: + :return: + """ + callable_name = python_callable.__name__.strip('<>') # Remove brackets around lambdas + task_id = f"{self.run_type}__preprocess_python_callable__{callable_name}" + + return PythonOperator( + task_id=task_id, + python_callable=python_callable, + op_kwargs=kwargs, + provide_context=True, + pool=self.pool, + dag=self.dag + ) + + def build_sharefile_to_snowflake_dag(self, **kwargs): + """ + Builds the DAG with tasks for each file source, including: + - Conditional execution based on DAG parameters. + - Downloading from ShareFile to local disk (supports both folder and single file paths). + - Optional transformation from CSV to JSONL. + - Upload to S3. + - Load into Snowflake using the default or a custom operator. + + Returns: + airflow.DAG: The constructed Airflow DAG instance. + """ + for file, details in self.file_sources.items(): + + check_if_file_in_param = PythonOperator( + task_id=f"check_{file}", + python_callable=skip_if_not_in_params_list, + op_kwargs={ + 'param_name': "file_sources", + 'value': file + }, + dag=self.dag, + **kwargs + ) + + transfer_sharefile_to_disk = SharefileToDiskOperator( + task_id=f"transfer_{file}_to_disk", + sharefile_conn_id=self.sharefile_conn_id, + sharefile_path=details['sharefile_path'], + local_path=os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file), + delete_remote=self.delete_remote, + dag=self.dag, + **kwargs + ) + + transform_to_jsonl = PythonOperator( + task_id=f"transform_{file}_to_jsonl", + python_callable=jsonl.translate_csv_file_to_jsonl, + op_kwargs={ + 'local_path': xcom_pull_template(transfer_sharefile_to_disk), + 'output_path': None, + 'delete_csv': self.delete_local_csv, + 'metadata_dict': {'file_source': file}, + }, + dag=self.dag, + **kwargs + ) + + transfer_disk_to_s3 = PythonOperator( + task_id=f"transfer_{file}_to_s3", + python_callable=s3.local_filepath_to_s3, + op_kwargs={ + 'local_filepath': xcom_pull_template(transfer_sharefile_to_disk), + 's3_destination_key': f"{self.base_s3_destination_key}/" + '{{ds_nodash}}' + "/" + '{{ts_nodash}}' + f"/{file}", + 's3_conn_id': self.s3_conn_id + }, + dag=self.dag, + **kwargs + ) + + if not self.transfer_s3_to_snowflake: + transfer_s3_to_snowflake = S3ToSnowflakeOperator( + task_id=f"{file}_s3_to_snowflake", + snowflake_conn_id=self.snowflake_conn_id, + database=self.database, + schema=self.schema, + table_name=file, + s3_destination_key=self.base_s3_destination_key, + full_refresh=self.full_refresh, + dag=self.dag, + **kwargs + ) + else: + transfer_s3_to_snowflake = self.build_python_operator(self.transfer_s3_to_snowflake) + + if self.transform_csv_to_jsonl: + check_if_file_in_param >> transfer_sharefile_to_disk >> transform_to_jsonl >> transfer_disk_to_s3 >> transfer_s3_to_snowflake + else: + check_if_file_in_param >> transfer_sharefile_to_disk >> transfer_disk_to_s3 >> transfer_s3_to_snowflake + + return self.dag \ No newline at end of file diff --git a/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py b/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py index 70184b9..6b84afd 100644 --- a/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py +++ b/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py @@ -50,92 +50,127 @@ def execute(self, **context): sf_hook = SharefileHook(sharefile_conn_id=self.sharefile_conn_id) sf_hook.get_conn() - # get the item id of the remote path, find all files within that path (up to 1000) - base_path_id = sf_hook.get_path_id(self.sharefile_path) - remote_files = sf_hook.find_files(base_path_id) - - # check whether we found anything - if len(remote_files) == 0: - self.log.info("No files on FTP") - raise AirflowSkipException - - # extract relevant file details - files = [] - for res in remote_files: - file_details = { - 'file_name': res['FileName'], - 'size': res['Size'], - 'parent_id': res['ParentID'], - 'file_path_no_base': res['ParentSemanticPath'].replace(self.sharefile_path, ''), - 'file_path_ftp': res['ParentSemanticPath'], - 'item_id': res['ItemID'] - } - - - files.append(file_details) - - if self.most_recent_file: - # of files found in directory, find the one with the most recent edit timestamp - max_timestamp = None - chosen_file = [] - for res in files: - # seem to be cases where search is out of date and returns items that don't exist - try: - item_info = sf_hook.item_info(res['item_id']) - except: - # if the item fails to fetch item info, it probably doesn't exist, so can't be most recent - continue - # grab last modified, compare to current max known - item_last_modified = item_info['ProgenyEditDate'] - if max_timestamp is None or item_last_modified > max_timestamp: - max_timestamp = item_last_modified - chosen_file = [res] - # overwrite files list with singular chosen item - files = chosen_file - - - # for all files, move to local - num_successes = 0 - - for file in files: + # get the item ID of the privided path (either file or folder) + item_id = sf_hook.get_path_id(self.sharefile_path) + if not item_id: + raise AirflowException(f"Cound not find item ID for path: {self.sharefile_path}") + + try: + item_info = sf_hook.item_info(item_id) + except Exception as e: + raise AirflowException(f"Failed to retrieve metadata for item {item_id}: {e}") + + # Case 1: single file + if item_info["odata.type"] == "ShareFile.Api.Models.File": + file_name = item_info['FileName'] + full_local_path = os.path.join(self.local_path, file_name) - remote_file = os.path.join(file['file_path_ftp'], file['file_name']) - self.log.info("Attempting to get file " + remote_file) - - # lower filename and replace spaces with underscores - file['file_name'] = file['file_name'].lower().replace(' ', '_') - - # check to see if there is other metadata needed in local path and if not, add filename to local path - if file['parent_id'] == base_path_id: - full_local_path = os.path.join(self.local_path, file['file_name']) - else: - full_local_path = os.path.join(self.local_path, file['file_path_no_base'], file['file_name']) - - # create dir (works if there is a file name or not) os.makedirs(os.path.dirname(full_local_path), exist_ok=True) - # download the file try: - sf_hook.download_to_disk(item_id=file['item_id'], local_path=full_local_path) - + sf_hook.download_to_disk(item_id=item_id, local_path=full_local_path) if self.delete_remote: - sf_hook.delete(file['item_id']) - - num_successes += 1 - + sf_hook.delete(item_id) + + return self.local_path + except Exception as err: - self.log.error(f'Failed to get file with message: {err}') - + self.log.error(f"Failed to download file {self.sharefile_path}: {err}") if slack_conn_id := context["dag"].user_defined_macros.get("slack_conn_id"): slack.slack_alert_download_failure( context=context, http_conn_id=slack_conn_id, - remote_path=remote_file, local_path=full_local_path, error=err + remote_path=self.sharefile_path, local_path=full_local_path, error=err ) + raise AirflowException("Single file transfer failed") + + else: + + # get the item id of the remote path, find all files within that path (up to 1000) + base_path_id = item_id + remote_files = sf_hook.find_files(base_path_id) + + # check whether we found anything + if len(remote_files) == 0: + self.log.info("No files on FTP") + raise AirflowSkipException + + # extract relevant file details + files = [] + for res in remote_files: + file_details = { + 'file_name': res['FileName'], + 'size': res['Size'], + 'parent_id': res['ParentID'], + 'file_path_no_base': res['ParentSemanticPath'].replace(self.sharefile_path, ''), + 'file_path_ftp': res['ParentSemanticPath'], + 'item_id': res['ItemID'] + } + + + files.append(file_details) + + if self.most_recent_file: + # of files found in directory, find the one with the most recent edit timestamp + max_timestamp = None + chosen_file = [] + for res in files: + # seem to be cases where search is out of date and returns items that don't exist + try: + item_info = sf_hook.item_info(res['item_id']) + except: + # if the item fails to fetch item info, it probably doesn't exist, so can't be most recent + continue + # grab last modified, compare to current max known + item_last_modified = item_info['ProgenyEditDate'] + if max_timestamp is None or item_last_modified > max_timestamp: + max_timestamp = item_last_modified + chosen_file = [res] + # overwrite files list with singular chosen item + files = chosen_file + + + # for all files, move to local + num_successes = 0 + + for file in files: + + remote_file = os.path.join(file['file_path_ftp'], file['file_name']) + self.log.info("Attempting to get file " + remote_file) + + # lower filename and replace spaces with underscores + file['file_name'] = file['file_name'].lower().replace(' ', '_') + + # check to see if there is other metadata needed in local path and if not, add filename to local path + if file['parent_id'] == base_path_id: + full_local_path = os.path.join(self.local_path, file['file_name']) + else: + full_local_path = os.path.join(self.local_path, file['file_path_no_base'], file['file_name']) + + # create dir (works if there is a file name or not) + os.makedirs(os.path.dirname(full_local_path), exist_ok=True) + + # download the file + try: + sf_hook.download_to_disk(item_id=file['item_id'], local_path=full_local_path) + + if self.delete_remote: + sf_hook.delete(file['item_id']) - continue + num_successes += 1 + + except Exception as err: + self.log.error(f'Failed to get file with message: {err}') + + if slack_conn_id := context["dag"].user_defined_macros.get("slack_conn_id"): + slack.slack_alert_download_failure( + context=context, http_conn_id=slack_conn_id, + remote_path=remote_file, local_path=full_local_path, error=err + ) + + continue - if num_successes == 0: - raise AirflowException("Failed transfer from ShareFile to local: no files transferred successfully!") + if num_successes == 0: + raise AirflowException("Failed transfer from ShareFile to local: no files transferred successfully!") - return self.local_path + return self.local_path